1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2017 Couchbase, Inc. 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#include "config.h" 19 20#include "rocksdb-kvstore.h" 21#include "rocksdb-kvstore_config.h" 22 23#include "ep_time.h" 24 25#include "kvstore_priv.h" 26 27#include <phosphor/phosphor.h> 28#include <platform/sysinfo.h> 29#include <rocksdb/convenience.h> 30#include <rocksdb/filter_policy.h> 31 32#include <stdio.h> 33#include <string.h> 34#include <algorithm> 35#include <gsl/gsl> 36#include <limits> 37#include <thread> 38 39#include "vbucket.h" 40 41namespace rockskv { 42// MetaData is used to serialize and de-serialize metadata respectively when 43// writing a Document mutation request to RocksDB and when reading a Document 44// from RocksDB. 45class MetaData { 46public: 47 MetaData() 48 : deleted(0), 49 version(0), 50 datatype(0), 51 flags(0), 52 valueSize(0), 53 exptime(0), 54 cas(0), 55 revSeqno(0), 56 bySeqno(0){}; 57 MetaData(bool deleted, 58 uint8_t version, 59 uint8_t datatype, 60 uint32_t flags, 61 uint32_t valueSize, 62 time_t exptime, 63 uint64_t cas, 64 uint64_t revSeqno, 65 int64_t bySeqno) 66 : deleted(deleted), 67 version(version), 68 datatype(datatype), 69 flags(flags), 70 valueSize(valueSize), 71 exptime(exptime), 72 cas(cas), 73 revSeqno(revSeqno), 74 bySeqno(bySeqno){}; 75 76// The `#pragma pack(1)` directive and the order of members are to keep 77// the size of MetaData as small as possible and uniform across different 78// platforms. 79#pragma pack(1) 80 uint8_t deleted : 1; 81 uint8_t version : 7; 82 uint8_t datatype; 83 uint32_t flags; 84 uint32_t valueSize; 85 time_t exptime; 86 uint64_t cas; 87 uint64_t revSeqno; 88 int64_t bySeqno; 89#pragma pack() 90}; 91} // namespace rockskv 92 93/** 94 * Class representing a document to be persisted in RocksDB. 95 */ 96class RocksRequest : public IORequest { 97public: 98 /** 99 * Constructor 100 * 101 * @param item Item instance to be persisted 102 * @param callback Persistence Callback 103 * @param del Flag indicating if it is an item deletion or not 104 */ 105 RocksRequest(const Item& item, MutationRequestCallback& callback) 106 : IORequest(item.getVBucketId(), 107 callback, 108 item.isDeleted(), 109 item.getKey()), 110 docBody(item.getValue()) { 111 docMeta = rockskv::MetaData( 112 item.isDeleted(), 113 0, 114 item.getDataType(), 115 item.getFlags(), 116 item.getNBytes(), 117 item.isDeleted() ? ep_real_time() : item.getExptime(), 118 item.getCas(), 119 item.getRevSeqno(), 120 item.getBySeqno()); 121 } 122 123 const rockskv::MetaData& getDocMeta() { 124 return docMeta; 125 } 126 127 // Get a rocksdb::Slice wrapping the Document MetaData 128 rocksdb::Slice getDocMetaSlice() { 129 return rocksdb::Slice(reinterpret_cast<char*>(&docMeta), 130 sizeof(docMeta)); 131 } 132 133 // Get a rocksdb::Slice wrapping the Document Body 134 rocksdb::Slice getDocBodySlice() { 135 const char* data = docBody ? docBody->getData() : nullptr; 136 size_t size = docBody ? docBody->valueSize() : 0; 137 return rocksdb::Slice(data, size); 138 } 139 140private: 141 rockskv::MetaData docMeta; 142 value_t docBody; 143}; 144 145// RocksDB docs suggest to "Use `rocksdb::DB::DestroyColumnFamilyHandle()` to 146// close a column family instead of deleting the column family handle directly" 147struct ColumnFamilyDeleter { 148 ColumnFamilyDeleter(rocksdb::DB& db) : db(db) { 149 } 150 void operator()(rocksdb::ColumnFamilyHandle* cfh) { 151 db.DestroyColumnFamilyHandle(cfh); 152 } 153 154private: 155 rocksdb::DB& db; 156}; 157using ColumnFamilyPtr = 158 std::unique_ptr<rocksdb::ColumnFamilyHandle, ColumnFamilyDeleter>; 159 160// The 'VBHandle' class is a wrapper around the ColumnFamilyHandles 161// for a VBucket. 162class VBHandle { 163public: 164 VBHandle(rocksdb::DB& rdb, 165 rocksdb::ColumnFamilyHandle* defaultCFH, 166 rocksdb::ColumnFamilyHandle* seqnoCFH, 167 uint16_t vbid) 168 : rdb(rdb), 169 defaultCFH(ColumnFamilyPtr(defaultCFH, rdb)), 170 seqnoCFH(ColumnFamilyPtr(seqnoCFH, rdb)), 171 vbid(vbid) { 172 } 173 174 void dropColumnFamilies() { 175 // The call to DropColumnFamily() records the drop in the Manifest, but 176 // the actual remove will happen when the ColumFamilyHandle is deleted. 177 auto status = rdb.DropColumnFamily(defaultCFH.get()); 178 if (!status.ok()) { 179 throw std::runtime_error( 180 "VBHandle::dropColumnFamilies: DropColumnFamily failed for " 181 "[vbid: " + 182 std::to_string(vbid) + ", CF: default]: " + 183 status.getState()); 184 } 185 status = rdb.DropColumnFamily(seqnoCFH.get()); 186 if (!status.ok()) { 187 throw std::runtime_error( 188 "VBHandle::dropColumnFamilies: DropColumnFamily failed for " 189 "[vbid: " + 190 std::to_string(vbid) + ", CF: seqno]: " + 191 status.getState()); 192 } 193 } 194 195 rocksdb::DB& rdb; 196 const ColumnFamilyPtr defaultCFH; 197 const ColumnFamilyPtr seqnoCFH; 198 const uint16_t vbid; 199}; 200 201RocksDBKVStore::RocksDBKVStore(RocksDBKVStoreConfig& configuration) 202 : KVStore(configuration), 203 vbHandles(configuration.getMaxVBuckets()), 204 in_transaction(false), 205 scanCounter(0), 206 logger(configuration.getLogger()) { 207 cachedVBStates.resize(configuration.getMaxVBuckets()); 208 writeOptions.sync = true; 209 210 // The RocksDB Options is a set of DBOptions and ColumnFamilyOptions. 211 // Together they cover all RocksDB available parameters. 212 auto status = rocksdb::GetDBOptionsFromString( 213 dbOptions, configuration.getDBOptions(), &dbOptions); 214 if (!status.ok()) { 215 throw std::invalid_argument( 216 std::string("RocksDBKVStore::open: GetDBOptionsFromString " 217 "error: ") + 218 status.getState()); 219 } 220 221 // Set number of background threads - note these are per-environment, so 222 // are shared across all DB instances (vBuckets) and all Buckets. 223 auto lowPri = configuration.getLowPriBackgroundThreads(); 224 if (lowPri == 0) { 225 lowPri = cb::get_available_cpu_count(); 226 } 227 rocksdb::Env::Default()->SetBackgroundThreads(lowPri, rocksdb::Env::LOW); 228 229 auto highPri = configuration.getHighPriBackgroundThreads(); 230 if (highPri == 0) { 231 highPri = cb::get_available_cpu_count(); 232 } 233 rocksdb::Env::Default()->SetBackgroundThreads(highPri, rocksdb::Env::HIGH); 234 235 dbOptions.create_if_missing = true; 236 237 // We use EventListener to set the correct ThreadLocal engine in the 238 // ObjectRegistry for the RocksDB Flusher and Compactor threads. This 239 // allows the memory tracker to track allocations and deallocations against 240 // the appropriate bucket. 241 auto eventListener = 242 std::make_shared<EventListener>(ObjectRegistry::getCurrentEngine()); 243 dbOptions.listeners.emplace_back(eventListener); 244 245 // Enable Statistics if 'Statistics::stat_level_' is provided by the 246 // configuration. We create a statistics object and pass to the multiple 247 // DBs managed by the same KVStore. Then the statistics object will contain 248 // aggregated values for all those DBs. Note that some stats are undefined 249 // and have no meaningful information across multiple DBs (e.g., 250 // "rocksdb.sequence.number"). 251 if (!configuration.getStatsLevel().empty()) { 252 dbOptions.statistics = rocksdb::CreateDBStatistics(); 253 dbOptions.statistics->stats_level_ = 254 getStatsLevel(configuration.getStatsLevel()); 255 } 256 257 // Apply the environment rate limit for Flush and Compaction 258 dbOptions.rate_limiter = configuration.getEnvRateLimiter(); 259 260 // Allocate the per-shard Block Cache 261 if (configuration.getBlockCacheRatio() > 0.0) { 262 auto blockCacheQuota = configuration.getBucketQuota() * 263 configuration.getBlockCacheRatio(); 264 // Keeping default settings for: 265 // num_shard_bits = -1 (automatically determined) 266 // strict_capacity_limit = false (do not fail insert when cache is full) 267 blockCache = rocksdb::NewLRUCache( 268 blockCacheQuota / configuration.getMaxShards(), 269 -1 /*num_shard_bits*/, 270 false /*strict_capacity_limit*/, 271 configuration.getBlockCacheHighPriPoolRatio()); 272 } 273 // Configure all the Column Families 274 const auto& cfOptions = configuration.getCFOptions(); 275 const auto& bbtOptions = configuration.getBBTOptions(); 276 defaultCFOptions = getBaselineDefaultCFOptions(); 277 seqnoCFOptions = getBaselineSeqnoCFOptions(); 278 applyUserCFOptions(defaultCFOptions, cfOptions, bbtOptions); 279 applyUserCFOptions(seqnoCFOptions, cfOptions, bbtOptions); 280 281 // Open the DB and load the ColumnFamilyHandle for all the 282 // existing Column Families (populates the 'vbHandles' vector) 283 openDB(); 284 285 // Calculate and apply the correct write_buffer_size for all Column 286 // Families. The Memtable size of each CF depends on the count of existing 287 // CFs in DB (besides other things). Thus, this must be called after 288 // 'openDB' (so that all the existing CFs have been loaded). 289 applyMemtablesQuota(std::lock_guard<std::mutex>(vbhMutex)); 290 291 // Read persisted VBs state 292 for (const auto vbh : vbHandles) { 293 if (vbh) { 294 readVBState(*vbh); 295 // Update stats 296 ++st.numLoadedVb; 297 } 298 } 299} 300 301RocksDBKVStore::~RocksDBKVStore() { 302 // Guarantees that all the ColumnFamilyHandles for the existing VBuckets 303 // are released before 'rdb' is deleted. From RocksDB docs: 304 // "Before delete DB, you have to close All column families by calling 305 // DestroyColumnFamilyHandle() with all the handles." 306 vbHandles.clear(); 307 in_transaction = false; 308} 309 310void RocksDBKVStore::openDB() { 311 auto dbname = getDBSubdir(); 312 createDataDir(dbname); 313 314 std::vector<std::string> cfs; 315 auto status = rocksdb::DB::ListColumnFamilies(dbOptions, dbname, &cfs); 316 if (!status.ok()) { 317 // If ListColumnFamilies failed because the DB does not exist, 318 // then it will be created the first time we call 'rocksdb::DB::Open'. 319 // Else, we throw an error if ListColumnFamilies failed for any other 320 // unexpected reason. 321 if (!(status.code() == rocksdb::Status::kIOError && 322 std::string(status.getState()) 323 .find("No such file or directory") != 324 std::string::npos)) { 325 throw std::runtime_error( 326 "RocksDBKVStore::openDB: ListColumnFamilies failed for DB " 327 "'" + 328 dbname + "': " + status.getState()); 329 } 330 } 331 332 // We need to pass a ColumnFamilyDescriptor for every existing CF. 333 // We populate 'cfDescriptors' so that it results in a vector 334 // containing packed CFs for every VBuckets, e.g. with 335 // MaxShards=4: 336 // cfDescriptors[0] = default_0 337 // cfDescriptors[1] = seqno_0 338 // cfDescriptors[2] = default_4 339 // cfDescriptors[3] = seqno_4 340 // .. 341 // That helps us in populating 'vbHandles' later, because after 342 // 'rocksdb::DB::Open' handles[i] will be the handle that we will use 343 // to operate on the ColumnFamily at cfDescriptors[i]. 344 std::vector<rocksdb::ColumnFamilyDescriptor> cfDescriptors; 345 for (uint16_t vbid = 0; vbid < configuration.getMaxVBuckets(); vbid++) { 346 if ((vbid % configuration.getMaxShards()) == 347 configuration.getShardId()) { 348 std::string defaultCF = "default_" + std::to_string(vbid); 349 std::string seqnoCF = "local+seqno_" + std::to_string(vbid); 350 if (std::find(cfs.begin(), cfs.end(), defaultCF) != cfs.end()) { 351 if (std::find(cfs.begin(), cfs.end(), seqnoCF) == cfs.end()) { 352 throw std::logic_error("RocksDBKVStore::openDB: DB '" + 353 dbname + 354 "' is in inconsistent state: CF " + 355 seqnoCF + " not found."); 356 } 357 cfDescriptors.emplace_back(defaultCF, defaultCFOptions); 358 cfDescriptors.emplace_back(seqnoCF, seqnoCFOptions); 359 } 360 } 361 } 362 363 // TODO: The RocksDB built-in 'default' CF always exists, need to check if 364 // we can drop it. 365 cfDescriptors.emplace_back(rocksdb::kDefaultColumnFamilyName, 366 rocksdb::ColumnFamilyOptions()); 367 std::vector<rocksdb::ColumnFamilyHandle*> handles; 368 rocksdb::DB* db; 369 status = rocksdb::DB::Open(dbOptions, dbname, cfDescriptors, &handles, &db); 370 if (!status.ok()) { 371 throw std::runtime_error( 372 "RocksDBKVStore::openDB: Open failed for database '" + dbname + 373 "': " + status.getState()); 374 } 375 rdb.reset(db); 376 377 // The way we populated 'cfDescriptors' guarantees that: if 'cfDescriptors' 378 // contains more than only the RocksDB 'default' CF (i.e., 379 // '(cfDescriptors.size() - 1) > 0') then 'cfDescriptors[i]' and 380 // 'cfDescriptors[i+1]' are respectively the 'default_' and 'seqno'_ CFs 381 // for a certain VBucket. 382 for (uint16_t i = 0; i < (cfDescriptors.size() - 1); i += 2) { 383 // Note: any further sanity-check is redundant as we will have always 384 // 'cf = "default_<vbid>"'. 385 const auto& cf = cfDescriptors[i].name; 386 uint16_t vbid = std::stoi(cf.substr(8)); 387 vbHandles[vbid] = std::make_shared<VBHandle>( 388 *rdb, handles[i], handles[i + 1], vbid); 389 } 390 391 // We need to release the ColumnFamilyHandle for the built-in 'default' CF 392 // here, as it is not managed by any VBHandle. 393 rdb->DestroyColumnFamilyHandle(handles.back()); 394} 395 396std::shared_ptr<VBHandle> RocksDBKVStore::getVBHandle(uint16_t vbid) { 397 std::lock_guard<std::mutex> lg(vbhMutex); 398 if (vbHandles[vbid]) { 399 return vbHandles[vbid]; 400 } 401 402 // If the VBHandle for vbid does not exist it means that we need to create 403 // the VBucket, i.e. we need to create the set of CFs on DB for vbid 404 std::vector<rocksdb::ColumnFamilyDescriptor> cfDescriptors; 405 auto vbid_ = std::to_string(vbid); 406 cfDescriptors.emplace_back("default_" + vbid_, defaultCFOptions); 407 cfDescriptors.emplace_back("local+seqno_" + vbid_, seqnoCFOptions); 408 409 std::vector<rocksdb::ColumnFamilyHandle*> handles; 410 auto status = rdb->CreateColumnFamilies(cfDescriptors, &handles); 411 if (!status.ok()) { 412 for (auto* cfh : handles) { 413 status = rdb->DropColumnFamily(cfh); 414 if (!status.ok()) { 415 throw std::runtime_error( 416 "RocksDBKVStore::getVBHandle: DropColumnFamily failed " 417 "for CF " + 418 cfh->GetName() + ": " + status.getState()); 419 } 420 } 421 throw std::runtime_error( 422 "RocksDBKVStore::getVBHandle: CreateColumnFamilies failed for " 423 "vbid " + 424 std::to_string(vbid) + ": " + status.getState()); 425 } 426 427 vbHandles[vbid] = 428 std::make_shared<VBHandle>(*rdb, handles[0], handles[1], vbid); 429 430 // The number of VBuckets has increased, we need to re-balance the 431 // Memtables Quota among the CFs of existing VBuckets. 432 applyMemtablesQuota(lg); 433 434 return vbHandles[vbid]; 435} 436 437std::string RocksDBKVStore::getDBSubdir() { 438 return configuration.getDBName() + "/rocksdb." + 439 std::to_string(configuration.getShardId()); 440} 441 442bool RocksDBKVStore::begin(std::unique_ptr<TransactionContext> txCtx) { 443 if (!txCtx) { 444 throw std::invalid_argument("RocksDBKVStore::begin: txCtx is null"); 445 } 446 in_transaction = true; 447 transactionCtx = std::move(txCtx); 448 return in_transaction; 449} 450 451bool RocksDBKVStore::commit(const Item* collectionsManifest) { 452 // This behaviour is to replicate the one in Couchstore. 453 // If `commit` is called when not in transaction, just return true. 454 if (!in_transaction) { 455 return true; 456 } 457 458 if (pendingReqs.size() == 0) { 459 in_transaction = false; 460 return true; 461 } 462 463 // Swap `pendingReqs` with the temporary `commitBatch` so that we can 464 // shorten the scope of the lock. 465 std::vector<std::unique_ptr<RocksRequest>> commitBatch; 466 { 467 std::lock_guard<std::mutex> lock(writeMutex); 468 std::swap(pendingReqs, commitBatch); 469 } 470 471 bool success = true; 472 auto vbid = commitBatch[0]->getVBucketId(); 473 474 // Flush all documents to disk 475 auto status = saveDocs(vbid, collectionsManifest, commitBatch); 476 if (!status.ok()) { 477 logger.log(EXTENSION_LOG_WARNING, 478 "RocksDBKVStore::commit: saveDocs error:%d, " 479 "vb:%" PRIu16, 480 status.code(), 481 vbid); 482 success = false; 483 } 484 485 commitCallback(status, commitBatch); 486 487 // This behaviour is to replicate the one in Couchstore. 488 // Set `in_transanction = false` only if `commit` is successful. 489 if (success) { 490 in_transaction = false; 491 transactionCtx.reset(); 492 } 493 494 return success; 495} 496 497static int getMutationStatus(rocksdb::Status status) { 498 switch (status.code()) { 499 case rocksdb::Status::Code::kOk: 500 return MUTATION_SUCCESS; 501 case rocksdb::Status::Code::kNotFound: 502 // This return value causes ep-engine to drop the failed flush 503 return DOC_NOT_FOUND; 504 case rocksdb::Status::Code::kBusy: 505 // This return value causes ep-engine to keep re-queueing the failed 506 // flush 507 return MUTATION_FAILED; 508 default: 509 throw std::runtime_error( 510 std::string("getMutationStatus: RocksDB error:") + 511 std::string(status.getState())); 512 } 513} 514 515void RocksDBKVStore::commitCallback( 516 rocksdb::Status status, 517 const std::vector<std::unique_ptr<RocksRequest>>& commitBatch) { 518 for (const auto& request : commitBatch) { 519 auto dataSize = request->getDocMetaSlice().size() + 520 request->getDocBodySlice().size(); 521 const auto& key = request->getKey(); 522 /* update ep stats */ 523 ++st.io_num_write; 524 st.io_write_bytes += (key.size() + dataSize); 525 526 auto rv = getMutationStatus(status); 527 if (request->isDelete()) { 528 if (status.code()) { 529 ++st.numDelFailure; 530 } else { 531 st.delTimeHisto.add(request->getDelta() / 1000); 532 } 533 if (rv != -1) { 534 // TODO: Should set `rv` to 1 or 0 depending on if this is a 535 // delete to an existing (1) or non-existing (0) item. However, 536 // to achieve this we would need to perform a Get to RocksDB 537 // which is costly. For now just assume that the item did exist. 538 rv = 1; 539 } 540 request->getDelCallback()->callback(*transactionCtx, rv); 541 } else { 542 if (status.code()) { 543 ++st.numSetFailure; 544 } else { 545 st.writeTimeHisto.add(request->getDelta() / 1000); 546 st.writeSizeHisto.add(dataSize + key.size()); 547 } 548 // TODO: Should set `mr.second` to true or false depending on if 549 // this is an insertion (true) or an update of an existing item 550 // (false). However, to achieve this we would need to perform a Get 551 // to RocksDB which is costly. For now just assume that the item 552 // did not exist. 553 mutation_result mr = std::make_pair(1, true); 554 request->getSetCallback()->callback(*transactionCtx, mr); 555 } 556 } 557} 558 559void RocksDBKVStore::rollback() { 560 if (in_transaction) { 561 in_transaction = false; 562 transactionCtx.reset(); 563 } 564} 565 566std::vector<vbucket_state*> RocksDBKVStore::listPersistedVbuckets() { 567 std::vector<vbucket_state*> result; 568 for (const auto& vb : cachedVBStates) { 569 result.emplace_back(vb.get()); 570 } 571 return result; 572} 573 574void RocksDBKVStore::set(const Item& item, 575 Callback<TransactionContext, mutation_result>& cb) { 576 if (!in_transaction) { 577 throw std::logic_error( 578 "RocksDBKVStore::set: in_transaction must be true to perform a " 579 "set operation."); 580 } 581 MutationRequestCallback callback; 582 callback.setCb = &cb; 583 pendingReqs.push_back(std::make_unique<RocksRequest>(item, callback)); 584} 585 586GetValue RocksDBKVStore::get(const StoredDocKey& key, 587 uint16_t vb, 588 bool fetchDelete) { 589 return getWithHeader(nullptr, key, vb, GetMetaOnly::No, fetchDelete); 590} 591 592GetValue RocksDBKVStore::getWithHeader(void* dbHandle, 593 const StoredDocKey& key, 594 uint16_t vb, 595 GetMetaOnly getMetaOnly, 596 bool fetchDelete) { 597 std::string value; 598 const auto vbh = getVBHandle(vb); 599 // TODO RDB: use a PinnableSlice to avoid some memcpy 600 rocksdb::Slice keySlice = getKeySlice(key); 601 rocksdb::Status s = rdb->Get( 602 rocksdb::ReadOptions(), vbh->defaultCFH.get(), keySlice, &value); 603 if (!s.ok()) { 604 return GetValue{NULL, ENGINE_KEY_ENOENT}; 605 } 606 return makeGetValue(vb, key, value, getMetaOnly); 607} 608 609void RocksDBKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t& itms) { 610 // TODO RDB: RocksDB supports a multi get which we should use here. 611 for (auto& it : itms) { 612 auto& key = it.first; 613 rocksdb::Slice keySlice = getKeySlice(key); 614 std::string value; 615 const auto vbh = getVBHandle(vb); 616 rocksdb::Status s = rdb->Get(rocksdb::ReadOptions(), 617 vbh->defaultCFH.get(), 618 keySlice, 619 &value); 620 if (s.ok()) { 621 it.second.value = 622 makeGetValue(vb, key, value, it.second.isMetaOnly); 623 GetValue* rv = &it.second.value; 624 for (auto& fetch : it.second.bgfetched_list) { 625 fetch->value = rv; 626 } 627 } else { 628 for (auto& fetch : it.second.bgfetched_list) { 629 fetch->value->setStatus(ENGINE_KEY_ENOENT); 630 } 631 } 632 } 633} 634 635void RocksDBKVStore::reset(uint16_t vbucketId) { 636 // TODO RDB: Implement. 637} 638 639void RocksDBKVStore::del(const Item& item, 640 Callback<TransactionContext, int>& cb) { 641 if (!item.isDeleted()) { 642 throw std::invalid_argument( 643 "RocksDBKVStore::del item to delete is not marked as deleted."); 644 } 645 if (!in_transaction) { 646 throw std::logic_error( 647 "RocksDBKVStore::del: in_transaction must be true to perform a " 648 "delete operation."); 649 } 650 // TODO: Deleted items remain as tombstones, but are not yet expired, 651 // they will accumuate forever. 652 MutationRequestCallback callback; 653 callback.delCb = &cb; 654 pendingReqs.push_back(std::make_unique<RocksRequest>(item, callback)); 655} 656 657void RocksDBKVStore::delVBucket(uint16_t vbid, uint64_t vb_version) { 658 std::lock_guard<std::mutex> lg1(writeMutex); 659 std::lock_guard<std::mutex> lg2(vbhMutex); 660 661 if (!vbHandles[vbid]) { 662 logger.log(EXTENSION_LOG_WARNING, 663 "RocksDBKVStore::delVBucket: VBucket not found, vb:%" PRIu16, 664 vbid); 665 return; 666 } 667 668 // 'vbHandles' stores a shared_ptr to VBHandle for each VBucket . The 669 // ownership of each pointer is shared among multiple threads performing 670 // different operations (e.g., 'get' and 'commit'). 671 // We want to call 'DropColumnFamily' here rather than in other threads 672 // because it is an expensive, IO-intensive operation and we do not want 673 // it to cause another thread (possibly a front-end one) from being blocked 674 // performing the drop. 675 // So, the thread executing 'delVBucket' spins until it is the exclusive 676 // owner of the shared_ptr (i.e., other concurrent threads like 'commit' 677 // have completed and do not own any copy of the shared_ptr). 678 { 679 std::shared_ptr<VBHandle> sharedPtr; 680 std::swap(vbHandles[vbid], sharedPtr); 681 while (!sharedPtr.unique()) { 682 std::this_thread::sleep_for(std::chrono::microseconds(100)); 683 } 684 // Drop all the CF for vbid. 685 sharedPtr->dropColumnFamilies(); 686 } 687 688 // The number of VBuckets has decreased, we need to re-balance the 689 // Memtables Quota among the CFs of existing VBuckets. 690 applyMemtablesQuota(lg2); 691} 692 693bool RocksDBKVStore::snapshotVBucket(uint16_t vbucketId, 694 const vbucket_state& vbstate, 695 VBStatePersist options) { 696 // TODO RDB: Refactor out behaviour common to this and CouchKVStore 697 auto start = ProcessClock::now(); 698 699 if (updateCachedVBState(vbucketId, vbstate) && 700 (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT || 701 options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT)) { 702 const auto vbh = getVBHandle(vbucketId); 703 rocksdb::WriteBatch batch; 704 auto status = saveVBStateToBatch(*vbh, vbstate, batch); 705 if (!status.ok()) { 706 logger.log(EXTENSION_LOG_WARNING, 707 "RocksDBKVStore::snapshotVBucket: saveVBStateToBatch() " 708 "failed state:%s vb:%" PRIu16 " :%s", 709 VBucket::toString(vbstate.state), 710 vbucketId, 711 status.getState()); 712 return false; 713 } 714 status = rdb->Write(writeOptions, &batch); 715 if (!status.ok()) { 716 logger.log(EXTENSION_LOG_WARNING, 717 "RocksDBKVStore::snapshotVBucket: Write() " 718 "failed state:%s vb:%" PRIu16 " :%s", 719 VBucket::toString(vbstate.state), 720 vbucketId, 721 status.getState()); 722 return false; 723 } 724 } 725 726 LOG(EXTENSION_LOG_DEBUG, 727 "RocksDBKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16 728 " state:%s", 729 vbucketId, 730 vbstate.toJSON().c_str()); 731 732 st.snapshotHisto.add(std::chrono::duration_cast<std::chrono::microseconds>( 733 ProcessClock::now() - start)); 734 735 return true; 736} 737 738bool RocksDBKVStore::snapshotStats(const std::map<std::string, std::string>&) { 739 // TODO RDB: Implement 740 return true; 741} 742 743void RocksDBKVStore::destroyInvalidVBuckets(bool) { 744 // TODO RDB: implement 745} 746 747size_t RocksDBKVStore::getNumShards() { 748 return configuration.getMaxShards(); 749} 750 751bool RocksDBKVStore::getStat(const char* name_, size_t& value) { 752 std::string name(name_); 753 754 // Memory Usage 755 if (name == "kMemTableTotal") { 756 return getStatFromMemUsage(rocksdb::MemoryUtil::kMemTableTotal, value); 757 } else if (name == "kMemTableUnFlushed") { 758 return getStatFromMemUsage(rocksdb::MemoryUtil::kMemTableUnFlushed, 759 value); 760 } else if (name == "kTableReadersTotal") { 761 return getStatFromMemUsage(rocksdb::MemoryUtil::kTableReadersTotal, 762 value); 763 } else if (name == "kCacheTotal") { 764 return getStatFromMemUsage(rocksdb::MemoryUtil::kCacheTotal, value); 765 } 766 767 // MemTable Size per Column Famiy 768 else if (name == "default_kSizeAllMemTables") { 769 return getStatFromProperties(ColumnFamily::Default, 770 rocksdb::DB::Properties::kSizeAllMemTables, 771 value); 772 } else if (name == "seqno_kSizeAllMemTables") { 773 return getStatFromProperties(ColumnFamily::Seqno, 774 rocksdb::DB::Properties::kSizeAllMemTables, 775 value); 776 } 777 778 // Block Cache hit/miss 779 else if (name == "rocksdb.block.cache.hit") { 780 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_HIT, value); 781 } else if (name == "rocksdb.block.cache.miss") { 782 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_MISS, value); 783 } else if (name == "rocksdb.block.cache.data.hit") { 784 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_DATA_HIT, 785 value); 786 } else if (name == "rocksdb.block.cache.data.miss") { 787 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_DATA_MISS, 788 value); 789 } else if (name == "rocksdb.block.cache.index.hit") { 790 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_INDEX_HIT, 791 value); 792 } else if (name == "rocksdb.block.cache.index.miss") { 793 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_INDEX_MISS, 794 value); 795 } else if (name == "rocksdb.block.cache.filter.hit") { 796 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_FILTER_HIT, 797 value); 798 } else if (name == "rocksdb.block.cache.filter.miss") { 799 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_FILTER_MISS, 800 value); 801 } 802 803 // Disk Usage per Column Family 804 else if (name == "default_kTotalSstFilesSize") { 805 return getStatFromProperties( 806 ColumnFamily::Default, 807 rocksdb::DB::Properties::kTotalSstFilesSize, 808 value); 809 } else if (name == "seqno_kTotalSstFilesSize") { 810 return getStatFromProperties( 811 ColumnFamily::Seqno, 812 rocksdb::DB::Properties::kTotalSstFilesSize, 813 value); 814 } 815 816 // Scan stats 817 else if (name == "scan_totalSeqnoHits") { 818 value = scanTotalSeqnoHits.load(); 819 return true; 820 } else if (name == "scan_oldSeqnoHits") { 821 value = scanOldSeqnoHits.load(); 822 return true; 823 } 824 825 return false; 826} 827 828StorageProperties RocksDBKVStore::getStorageProperties(void) { 829 StorageProperties rv(StorageProperties::EfficientVBDump::Yes, 830 StorageProperties::EfficientVBDeletion::Yes, 831 StorageProperties::PersistedDeletion::No, 832 // TODO RDB: Not strictly true, multiGet 833 // does not yet use the underlying multi get 834 // of RocksDB 835 StorageProperties::EfficientGet::Yes, 836 StorageProperties::ConcurrentWriteCompact::Yes); 837 return rv; 838} 839 840std::unordered_set<const rocksdb::Cache*> RocksDBKVStore::getCachePointers() { 841 std::unordered_set<const rocksdb::Cache*> cache_set; 842 843 // TODO: Cache from DBImpl. The 'std::shared_ptr<Cache> 844 // table_cache_' pointer is not exposed through the 'DB' interface 845 846 // Cache from DBOptions 847 // Note: we do not use the 'row_cache' currently. 848 cache_set.insert(rdb->GetDBOptions().row_cache.get()); 849 850 // Cache from table factories. 851 addCFBlockCachePointers(defaultCFOptions, cache_set); 852 addCFBlockCachePointers(seqnoCFOptions, cache_set); 853 854 return cache_set; 855} 856 857void RocksDBKVStore::addCFBlockCachePointers( 858 const rocksdb::ColumnFamilyOptions& cfOptions, 859 std::unordered_set<const rocksdb::Cache*>& cache_set) { 860 if (cfOptions.table_factory) { 861 auto* table_options = cfOptions.table_factory->GetOptions(); 862 auto* bbt_options = 863 static_cast<rocksdb::BlockBasedTableOptions*>(table_options); 864 cache_set.insert(bbt_options->block_cache.get()); 865 cache_set.insert(bbt_options->block_cache_compressed.get()); 866 } 867} 868 869rocksdb::StatsLevel RocksDBKVStore::getStatsLevel( 870 const std::string& stats_level) { 871 if (stats_level == "kExceptDetailedTimers") { 872 return rocksdb::StatsLevel::kExceptDetailedTimers; 873 } else if (stats_level == "kExceptTimeForMutex") { 874 return rocksdb::StatsLevel::kExceptTimeForMutex; 875 } else if (stats_level == "kAll") { 876 return rocksdb::StatsLevel::kAll; 877 } else { 878 throw std::invalid_argument( 879 std::string("RocksDBKVStore::getStatsLevel: stats_level: '") + 880 stats_level + std::string("'")); 881 } 882} 883 884rocksdb::Slice RocksDBKVStore::getKeySlice(const DocKey& key) { 885 return rocksdb::Slice(reinterpret_cast<const char*>(key.data()), 886 key.size()); 887} 888 889rocksdb::Slice RocksDBKVStore::getSeqnoSlice(const int64_t* seqno) { 890 return rocksdb::Slice(reinterpret_cast<const char*>(seqno), sizeof(*seqno)); 891} 892 893int64_t RocksDBKVStore::getNumericSeqno(const rocksdb::Slice& seqnoSlice) { 894 assert(seqnoSlice.size() == sizeof(int64_t)); 895 int64_t seqno; 896 std::memcpy(&seqno, seqnoSlice.data(), seqnoSlice.size()); 897 return seqno; 898} 899 900std::unique_ptr<Item> RocksDBKVStore::makeItem(uint16_t vb, 901 const DocKey& key, 902 const rocksdb::Slice& s, 903 GetMetaOnly getMetaOnly) { 904 assert(s.size() >= sizeof(rockskv::MetaData)); 905 906 const char* data = s.data(); 907 908 rockskv::MetaData meta; 909 std::memcpy(&meta, data, sizeof(meta)); 910 data += sizeof(meta); 911 912 bool includeValue = getMetaOnly == GetMetaOnly::No && meta.valueSize; 913 914 auto item = std::make_unique<Item>(key, 915 meta.flags, 916 meta.exptime, 917 includeValue ? data : nullptr, 918 includeValue ? meta.valueSize : 0, 919 meta.datatype, 920 meta.cas, 921 meta.bySeqno, 922 vb, 923 meta.revSeqno); 924 925 if (meta.deleted) { 926 item->setDeleted(); 927 } 928 929 return item; 930} 931 932GetValue RocksDBKVStore::makeGetValue(uint16_t vb, 933 const DocKey& key, 934 const std::string& value, 935 GetMetaOnly getMetaOnly) { 936 rocksdb::Slice sval(value); 937 return GetValue( 938 makeItem(vb, key, sval, getMetaOnly), ENGINE_SUCCESS, -1, 0); 939} 940 941void RocksDBKVStore::readVBState(const VBHandle& vbh) { 942 // Largely copied from CouchKVStore 943 // TODO RDB: refactor out sections common to CouchKVStore 944 vbucket_state_t state = vbucket_state_dead; 945 uint64_t checkpointId = 0; 946 uint64_t maxDeletedSeqno = 0; 947 int64_t highSeqno = readHighSeqnoFromDisk(vbh); 948 std::string failovers; 949 uint64_t purgeSeqno = 0; 950 uint64_t lastSnapStart = 0; 951 uint64_t lastSnapEnd = 0; 952 uint64_t maxCas = 0; 953 int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised; 954 bool mightContainXattrs = false; 955 956 auto key = getVbstateKey(); 957 std::string vbstate; 958 auto vbid = vbh.vbid; 959 auto status = rdb->Get(rocksdb::ReadOptions(), 960 vbh.seqnoCFH.get(), 961 getSeqnoSlice(&key), 962 &vbstate); 963 if (!status.ok()) { 964 if (status.IsNotFound()) { 965 logger.log(EXTENSION_LOG_NOTICE, 966 "RocksDBKVStore::readVBState: '_local/vbstate.%" PRIu16 967 "' not found", 968 vbid); 969 } else { 970 logger.log(EXTENSION_LOG_WARNING, 971 "RocksDBKVStore::readVBState: error getting vbstate " 972 "error:%s, vb:%" PRIu16, 973 status.getState(), 974 vbid); 975 } 976 } else { 977 cJSON* jsonObj = cJSON_Parse(vbstate.c_str()); 978 if (!jsonObj) { 979 logger.log(EXTENSION_LOG_WARNING, 980 "RocksKVStore::readVBState: Failed to parse the vbstat " 981 "json doc for vb:%" PRIu16 ", json:%s", 982 vbid, 983 vbstate.c_str()); 984 } 985 986 const std::string vb_state = 987 getJSONObjString(cJSON_GetObjectItem(jsonObj, "state")); 988 const std::string checkpoint_id = 989 getJSONObjString(cJSON_GetObjectItem(jsonObj, "checkpoint_id")); 990 const std::string max_deleted_seqno = getJSONObjString( 991 cJSON_GetObjectItem(jsonObj, "max_deleted_seqno")); 992 const std::string snapStart = 993 getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_start")); 994 const std::string snapEnd = 995 getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_end")); 996 const std::string maxCasValue = 997 getJSONObjString(cJSON_GetObjectItem(jsonObj, "max_cas")); 998 const std::string hlcCasEpoch = 999 getJSONObjString(cJSON_GetObjectItem(jsonObj, "hlc_epoch")); 1000 mightContainXattrs = getJSONObjBool( 1001 cJSON_GetObjectItem(jsonObj, "might_contain_xattrs")); 1002 1003 cJSON* failover_json = cJSON_GetObjectItem(jsonObj, "failover_table"); 1004 if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0 || 1005 max_deleted_seqno.compare("") == 0) { 1006 logger.log(EXTENSION_LOG_WARNING, 1007 "RocksDBKVStore::readVBState: State" 1008 " JSON doc for vb:%" PRIu16 1009 " is in the wrong format:%s, " 1010 "vb state:%s, checkpoint id:%s and max deleted seqno:%s", 1011 vbid, 1012 vbstate.c_str(), 1013 vb_state.c_str(), 1014 checkpoint_id.c_str(), 1015 max_deleted_seqno.c_str()); 1016 } else { 1017 state = VBucket::fromString(vb_state.c_str()); 1018 maxDeletedSeqno = std::stoull(max_deleted_seqno); 1019 checkpointId = std::stoull(checkpoint_id); 1020 1021 if (snapStart.compare("") == 0) { 1022 lastSnapStart = highSeqno; 1023 } else { 1024 lastSnapStart = std::stoull(snapStart.c_str()); 1025 } 1026 1027 if (snapEnd.compare("") == 0) { 1028 lastSnapEnd = highSeqno; 1029 } else { 1030 lastSnapEnd = std::stoull(snapEnd.c_str()); 1031 } 1032 1033 if (maxCasValue.compare("") != 0) { 1034 maxCas = std::stoull(maxCasValue.c_str()); 1035 } 1036 1037 if (!hlcCasEpoch.empty()) { 1038 hlcCasEpochSeqno = std::stoull(hlcCasEpoch); 1039 } 1040 1041 if (failover_json) { 1042 failovers = to_string(failover_json, false); 1043 } 1044 } 1045 cJSON_Delete(jsonObj); 1046 } 1047 1048 cachedVBStates[vbh.vbid] = 1049 std::make_unique<vbucket_state>(state, 1050 checkpointId, 1051 maxDeletedSeqno, 1052 highSeqno, 1053 purgeSeqno, 1054 lastSnapStart, 1055 lastSnapEnd, 1056 maxCas, 1057 hlcCasEpochSeqno, 1058 mightContainXattrs, 1059 failovers); 1060} 1061 1062rocksdb::Status RocksDBKVStore::saveVBStateToBatch(const VBHandle& vbh, 1063 const vbucket_state& vbState, 1064 rocksdb::WriteBatch& batch) { 1065 std::stringstream jsonState; 1066 1067 jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\"" 1068 << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\"" 1069 << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno 1070 << "\""; 1071 if (!vbState.failovers.empty()) { 1072 jsonState << ",\"failover_table\": " << vbState.failovers; 1073 } 1074 jsonState << ",\"snap_start\": \"" << vbState.lastSnapStart << "\"" 1075 << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\"" 1076 << ",\"max_cas\": \"" << vbState.maxCas << "\"" 1077 << ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\""; 1078 1079 if (vbState.mightContainXattrs) { 1080 jsonState << ",\"might_contain_xattrs\": true"; 1081 } else { 1082 jsonState << ",\"might_contain_xattrs\": false"; 1083 } 1084 1085 jsonState << "}"; 1086 1087 auto key = getVbstateKey(); 1088 rocksdb::Slice keySlice = getSeqnoSlice(&key); 1089 return batch.Put(vbh.seqnoCFH.get(), keySlice, jsonState.str()); 1090} 1091 1092rocksdb::ColumnFamilyOptions RocksDBKVStore::getBaselineDefaultCFOptions() { 1093 rocksdb::ColumnFamilyOptions cfOptions; 1094 // Enable Point Lookup Optimization for the 'default' Column Family 1095 // Note: whatever we give in input as 'block_cache_size_mb', the Block 1096 // Cache will be reset with the shared 'blockCache' of size 1097 // 'rocksdb_block_cache_size' 1098 cfOptions.OptimizeForPointLookup(1); 1099 return cfOptions; 1100} 1101 1102rocksdb::ColumnFamilyOptions RocksDBKVStore::getBaselineSeqnoCFOptions() { 1103 rocksdb::ColumnFamilyOptions cfOptions; 1104 cfOptions.comparator = &seqnoComparator; 1105 return cfOptions; 1106} 1107 1108void RocksDBKVStore::applyUserCFOptions(rocksdb::ColumnFamilyOptions& cfOptions, 1109 const std::string& newCfOptions, 1110 const std::string& newBbtOptions) { 1111 // Apply 'newCfOptions' on top of 'cfOptions' 1112 auto status = rocksdb::GetColumnFamilyOptionsFromString( 1113 cfOptions, newCfOptions, &cfOptions); 1114 if (!status.ok()) { 1115 throw std::invalid_argument( 1116 std::string("RocksDBKVStore::applyUserCFOptions: " 1117 "GetColumnFamilyOptionsFromString error: ") + 1118 status.getState()); 1119 } 1120 1121 // RocksDB ColumnFamilyOptions provide advanced options for the 1122 // Block Based Table file format, which is the default format for SST files. 1123 // Apply 'newBbtOptions' on top of the current BlockBasedTableOptions of 1124 // 'cfOptions' 1125 rocksdb::BlockBasedTableOptions baseOptions; 1126 if (cfOptions.table_factory) { 1127 auto* bbtOptions = cfOptions.table_factory->GetOptions(); 1128 if (bbtOptions) { 1129 baseOptions = *( 1130 static_cast<rocksdb::BlockBasedTableOptions*>(bbtOptions)); 1131 } 1132 } 1133 1134 rocksdb::BlockBasedTableOptions tableOptions; 1135 status = rocksdb::GetBlockBasedTableOptionsFromString( 1136 baseOptions, newBbtOptions, &tableOptions); 1137 if (!status.ok()) { 1138 throw std::invalid_argument( 1139 std::string("RocksDBKVStore::applyUserCFOptions: " 1140 "GetBlockBasedTableOptionsFromString error: ") + 1141 status.getState()); 1142 } 1143 1144 // If using Partitioned Filters, then use the RocksDB recommended params 1145 // (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/filter_policy.h#L133): 1146 // "bits_per_key: bits per key in bloom filter. A good value for 1147 // bits_per_key is 10, which yields a filter with ~1% false 1148 // positive rate. 1149 // use_block_based_builder: use block based filter rather than full 1150 // filter. If you want to build a full filter, it needs to be 1151 // set to false." 1152 if (tableOptions.partition_filters == true) { 1153 tableOptions.filter_policy.reset( 1154 rocksdb::NewBloomFilterPolicy(10, false)); 1155 } 1156 1157 // Always use the per-shard shared Block Cache. If it is nullptr, RocksDB 1158 // will allocate a default size Block Cache. 1159 tableOptions.block_cache = blockCache; 1160 1161 // Set the new BlockBasedTableOptions 1162 cfOptions.table_factory.reset( 1163 rocksdb::NewBlockBasedTableFactory(tableOptions)); 1164 1165 // Set the user-provided size amplification factor if under Universal 1166 // Compaction 1167 if (cfOptions.compaction_style == 1168 rocksdb::CompactionStyle::kCompactionStyleUniversal) { 1169 auto& configuration = 1170 dynamic_cast<RocksDBKVStoreConfig&>(this->configuration); 1171 cfOptions.compaction_options_universal.max_size_amplification_percent = 1172 configuration.getUCMaxSizeAmplificationPercent(); 1173 } 1174} 1175 1176rocksdb::Status RocksDBKVStore::writeAndTimeBatch(rocksdb::WriteBatch batch) { 1177 auto begin = ProcessClock::now(); 1178 auto status = rdb->Write(writeOptions, &batch); 1179 st.commitHisto.add(std::chrono::duration_cast<std::chrono::microseconds>( 1180 ProcessClock::now() - begin)); 1181 return status; 1182} 1183 1184rocksdb::Status RocksDBKVStore::saveDocs( 1185 uint16_t vbid, 1186 const Item* collectionsManifest, 1187 const std::vector<std::unique_ptr<RocksRequest>>& commitBatch) { 1188 auto reqsSize = commitBatch.size(); 1189 if (reqsSize == 0) { 1190 st.docsCommitted = 0; 1191 return rocksdb::Status::OK(); 1192 } 1193 1194 auto& vbstate = cachedVBStates[vbid]; 1195 if (vbstate == nullptr) { 1196 throw std::logic_error("RocksDBKVStore::saveDocs: cachedVBStates[" + 1197 std::to_string(vbid) + "] is NULL"); 1198 } 1199 1200 rocksdb::Status status; 1201 int64_t maxDBSeqno = 0; 1202 rocksdb::WriteBatch batch; 1203 1204 const auto vbh = getVBHandle(vbid); 1205 1206 for (const auto& request : commitBatch) { 1207 int64_t bySeqno = request->getDocMeta().bySeqno; 1208 maxDBSeqno = std::max(maxDBSeqno, bySeqno); 1209 1210 status = addRequestToWriteBatch(*vbh, batch, request.get()); 1211 if (!status.ok()) { 1212 logger.log(EXTENSION_LOG_WARNING, 1213 "RocksDBKVStore::saveDocs: addRequestToWriteBatch " 1214 "error:%d, vb:%" PRIu16, 1215 status.code(), 1216 vbid); 1217 return status; 1218 } 1219 1220 // Check if we should split into a new writeBatch if the batch size 1221 // exceeds the write_buffer_size - this is necessary because we 1222 // don't want our WriteBatch to exceed the configured memtable size, as 1223 // that can cause significant memory bloating (see MB-26521). 1224 // Note the limit check is only approximate, as the batch contains 1225 // updates for at least 2 CFs (key & seqno) which will be written into 1226 // separate memtables, so we don't exactly know the size contribution 1227 // to each memtable in the batch. 1228 const auto batchLimit = defaultCFOptions.write_buffer_size + 1229 seqnoCFOptions.write_buffer_size; 1230 if (batch.GetDataSize() > batchLimit) { 1231 status = writeAndTimeBatch(batch); 1232 if (!status.ok()) { 1233 logger.log(EXTENSION_LOG_WARNING, 1234 "RocksDBKVStore::saveDocs: rocksdb::DB::Write " 1235 "error:%d, " 1236 "vb:%" PRIu16, 1237 status.code(), 1238 vbid); 1239 return status; 1240 } 1241 batch.Clear(); 1242 } 1243 } 1244 1245 status = saveVBStateToBatch(*vbh, *vbstate, batch); 1246 if (!status.ok()) { 1247 logger.log(EXTENSION_LOG_WARNING, 1248 "RocksDBKVStore::saveDocs: saveVBStateToBatch error:%d", 1249 status.code()); 1250 return status; 1251 } 1252 1253 status = writeAndTimeBatch(batch); 1254 if (!status.ok()) { 1255 logger.log(EXTENSION_LOG_WARNING, 1256 "RocksDBKVStore::saveDocs: rocksdb::DB::Write error:%d, " 1257 "vb:%" PRIu16, 1258 status.code(), 1259 vbid); 1260 return status; 1261 } 1262 1263 st.batchSize.add(reqsSize); 1264 st.docsCommitted = reqsSize; 1265 1266 // Update high seqno 1267 vbstate->highSeqno = maxDBSeqno; 1268 1269 return rocksdb::Status::OK(); 1270} 1271 1272rocksdb::Status RocksDBKVStore::addRequestToWriteBatch( 1273 const VBHandle& vbh, 1274 rocksdb::WriteBatch& batch, 1275 RocksRequest* request) { 1276 uint16_t vbid = request->getVBucketId(); 1277 1278 rocksdb::Slice keySlice = getKeySlice(request->getKey()); 1279 rocksdb::SliceParts keySliceParts(&keySlice, 1); 1280 1281 rocksdb::Slice docSlices[] = {request->getDocMetaSlice(), 1282 request->getDocBodySlice()}; 1283 rocksdb::SliceParts valueSliceParts(docSlices, 2); 1284 1285 rocksdb::Slice bySeqnoSlice = getSeqnoSlice(&request->getDocMeta().bySeqno); 1286 // We use the `saveDocsHisto` to track the time spent on 1287 // `rocksdb::WriteBatch::Put()`. 1288 auto begin = ProcessClock::now(); 1289 auto status = 1290 batch.Put(vbh.defaultCFH.get(), keySliceParts, valueSliceParts); 1291 if (!status.ok()) { 1292 logger.log(EXTENSION_LOG_WARNING, 1293 "RocksDBKVStore::saveDocs: rocksdb::WriteBatch::Put " 1294 "[ColumnFamily: \'default\'] error:%d, " 1295 "vb:%" PRIu16, 1296 status.code(), 1297 vbid); 1298 return status; 1299 } 1300 status = batch.Put(vbh.seqnoCFH.get(), bySeqnoSlice, keySlice); 1301 if (!status.ok()) { 1302 logger.log(EXTENSION_LOG_WARNING, 1303 "RocksDBKVStore::saveDocs: rocksdb::WriteBatch::Put " 1304 "[ColumnFamily: \'seqno\'] error:%d, " 1305 "vb:%" PRIu16, 1306 status.code(), 1307 vbid); 1308 return status; 1309 } 1310 st.saveDocsHisto.add(std::chrono::duration_cast<std::chrono::microseconds>( 1311 ProcessClock::now() - begin)); 1312 1313 return rocksdb::Status::OK(); 1314} 1315 1316int64_t RocksDBKVStore::readHighSeqnoFromDisk(const VBHandle& vbh) { 1317 std::unique_ptr<rocksdb::Iterator> it( 1318 rdb->NewIterator(rocksdb::ReadOptions(), vbh.seqnoCFH.get())); 1319 1320 // Seek to the highest seqno=>key mapping stored for the vbid 1321 auto maxSeqno = std::numeric_limits<int64_t>::max(); 1322 rocksdb::Slice maxSeqnoSlice = getSeqnoSlice(&maxSeqno); 1323 it->SeekForPrev(maxSeqnoSlice); 1324 1325 if (!it->Valid()) { 1326 return 0; 1327 } 1328 auto highSeqno = getNumericSeqno(it->key()); 1329 // We use a negative seqno as key for VBState. Do not consider it. 1330 return highSeqno >= 0 ? highSeqno : 0; 1331} 1332 1333int64_t RocksDBKVStore::getVbstateKey() { 1334 // We put the VBState into the SeqnoCF. As items in the SeqnoCF are ordered 1335 // by increasing-seqno, we reserve a negative special key to VBState so 1336 // that we can access it in O(1). 1337 return -9999; 1338} 1339 1340ScanContext* RocksDBKVStore::initScanContext( 1341 std::shared_ptr<StatusCallback<GetValue>> cb, 1342 std::shared_ptr<StatusCallback<CacheLookup>> cl, 1343 uint16_t vbid, 1344 uint64_t startSeqno, 1345 DocumentFilter options, 1346 ValueFilter valOptions) { 1347 size_t scanId = scanCounter++; 1348 1349 { 1350 std::lock_guard<std::mutex> lg(scanSnapshotsMutex); 1351 scanSnapshots.emplace(scanId, SnapshotPtr(rdb->GetSnapshot(), *rdb)); 1352 } 1353 1354 // As we cannot efficiently determine how many documents this scan will 1355 // find, we approximate this value with the seqno difference + 1 1356 // as scan is supposed to be inclusive at both ends, 1357 // seqnos 2 to 4 covers 3 docs not 4 - 2 = 2 1358 1359 uint64_t endSeqno = cachedVBStates[vbid]->highSeqno; 1360 return new ScanContext(cb, 1361 cl, 1362 vbid, 1363 scanId, 1364 startSeqno, 1365 endSeqno, 1366 0, /*TODO RDB: pass the real purge-seqno*/ 1367 options, 1368 valOptions, 1369 /* documentCount */ endSeqno - startSeqno + 1, 1370 configuration); 1371} 1372 1373scan_error_t RocksDBKVStore::scan(ScanContext* ctx) { 1374 if (!ctx) { 1375 return scan_failed; 1376 } 1377 1378 if (ctx->lastReadSeqno == ctx->maxSeqno) { 1379 return scan_success; 1380 } 1381 1382 auto startSeqno = ctx->startSeqno; 1383 if (ctx->lastReadSeqno != 0) { 1384 startSeqno = ctx->lastReadSeqno + 1; 1385 } 1386 1387 TRACE_EVENT2("RocksDBKVStore", 1388 "scan", 1389 "vbid", 1390 ctx->vbid, 1391 "startSeqno", 1392 startSeqno); 1393 1394 GetMetaOnly isMetaOnly = ctx->valFilter == ValueFilter::KEYS_ONLY 1395 ? GetMetaOnly::Yes 1396 : GetMetaOnly::No; 1397 1398 rocksdb::ReadOptions snapshotOpts{rocksdb::ReadOptions()}; 1399 1400 // Lock for safe access to the scanSnapshots map and to ensure the snapshot 1401 // doesn't get destroyed whilst we have the pointer. 1402 // @todo use a shared_ptr and reduce the lock scope to just the map::at call 1403 std::lock_guard<std::mutex> lg(scanSnapshotsMutex); 1404 snapshotOpts.snapshot = scanSnapshots.at(ctx->scanId).get(); 1405 1406 rocksdb::Slice startSeqnoSlice = getSeqnoSlice(&startSeqno); 1407 const auto vbh = getVBHandle(ctx->vbid); 1408 std::unique_ptr<rocksdb::Iterator> it( 1409 rdb->NewIterator(snapshotOpts, vbh->seqnoCFH.get())); 1410 if (!it) { 1411 throw std::logic_error( 1412 "RocksDBKVStore::scan: rocksdb::Iterator to Seqno Column " 1413 "Family is nullptr"); 1414 } 1415 it->Seek(startSeqnoSlice); 1416 1417 rocksdb::Slice endSeqnoSlice = getSeqnoSlice(&ctx->maxSeqno); 1418 auto isPastEnd = [&endSeqnoSlice, this](rocksdb::Slice seqSlice) { 1419 return seqnoComparator.Compare(seqSlice, endSeqnoSlice) == 1; 1420 }; 1421 1422 for (; it->Valid() && !isPastEnd(it->key()); it->Next()) { 1423 scanTotalSeqnoHits++; 1424 auto seqno = getNumericSeqno(it->key()); 1425 rocksdb::Slice keySlice = it->value(); 1426 std::string valueStr; 1427 auto s = rdb->Get( 1428 snapshotOpts, vbh->defaultCFH.get(), keySlice, &valueStr); 1429 1430 if (!s.ok()) { 1431 // TODO RDB: Old seqnos are never removed from the db! 1432 // If the item does not exist (s.isNotFound()) 1433 // the seqno => key mapping could be removed; not even 1434 // a tombstone remains of that item. 1435 1436 // Note: I account also the hits for deleted documents because it 1437 // is logically correct. But, we switch on the RocksDB built-in 1438 // Bloom Filter by default and we try to keep all the Filter blocks 1439 // in the BlockCache. So, I expect that the impact of old-seqno hits 1440 // is minimum in this case. 1441 scanOldSeqnoHits++; 1442 1443 continue; 1444 } 1445 1446 rocksdb::Slice valSlice(valueStr); 1447 1448 // TODO RDB: Deal with collections 1449 DocKey key(reinterpret_cast<const uint8_t*>(keySlice.data()), 1450 keySlice.size(), 1451 DocNamespace::DefaultCollection); 1452 1453 std::unique_ptr<Item> itm = 1454 makeItem(ctx->vbid, key, valSlice, isMetaOnly); 1455 1456 if (itm->getBySeqno() > seqno) { 1457 // TODO RDB: Old seqnos are never removed from the db! 1458 // If the item has a newer seqno now, the stale 1459 // seqno => key mapping could be removed 1460 scanOldSeqnoHits++; 1461 continue; 1462 } else if (itm->getBySeqno() < seqno) { 1463 throw std::logic_error( 1464 "RocksDBKVStore::scan: index has a higher seqno" 1465 "than the document in a snapshot!"); 1466 } 1467 1468 bool includeDeletes = 1469 (ctx->docFilter == DocumentFilter::NO_DELETES) ? false : true; 1470 bool onlyKeys = 1471 (ctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false; 1472 1473 if (!includeDeletes && itm->isDeleted()) { 1474 continue; 1475 } 1476 int64_t byseqno = itm->getBySeqno(); 1477 CacheLookup lookup(key, 1478 byseqno, 1479 ctx->vbid, 1480 ctx->collectionsContext.getSeparator()); 1481 ctx->lookup->callback(lookup); 1482 1483 int status = ctx->lookup->getStatus(); 1484 1485 if (status == ENGINE_KEY_EEXISTS) { 1486 ctx->lastReadSeqno = byseqno; 1487 continue; 1488 } else if (status == ENGINE_ENOMEM) { 1489 return scan_again; 1490 } 1491 1492 GetValue rv(std::move(itm), ENGINE_SUCCESS, -1, onlyKeys); 1493 ctx->callback->callback(rv); 1494 status = ctx->callback->getStatus(); 1495 1496 if (status == ENGINE_ENOMEM) { 1497 return scan_again; 1498 } 1499 1500 ctx->lastReadSeqno = byseqno; 1501 } 1502 1503 cb_assert(it->status().ok()); // Check for any errors found during the scan 1504 1505 return scan_success; 1506} 1507 1508void RocksDBKVStore::destroyScanContext(ScanContext* ctx) { 1509 if (ctx == nullptr) { 1510 return; 1511 } 1512 std::lock_guard<std::mutex> lg(scanSnapshotsMutex); 1513 // TODO RDB: Might be nice to have the snapshot in the ctx and 1514 // release it on destruction 1515 auto it = scanSnapshots.find(ctx->scanId); 1516 if (it != scanSnapshots.end()) { 1517 scanSnapshots.erase(it); 1518 } 1519 delete ctx; 1520} 1521 1522bool RocksDBKVStore::getStatFromMemUsage( 1523 const rocksdb::MemoryUtil::UsageType type, size_t& value) { 1524 std::vector<rocksdb::DB*> dbs = {rdb.get()}; 1525 auto cache_set = getCachePointers(); 1526 std::map<rocksdb::MemoryUtil::UsageType, uint64_t> usageByType; 1527 1528 auto status = rocksdb::MemoryUtil::GetApproximateMemoryUsageByType( 1529 dbs, cache_set, &usageByType); 1530 if (!status.ok()) { 1531 logger.log(EXTENSION_LOG_NOTICE, 1532 "RocksDBKVStore::getStatFromMemUsage: " 1533 "GetApproximateMemoryUsageByType error: %s", 1534 status.getState()); 1535 return false; 1536 } 1537 1538 value = usageByType.at(type); 1539 1540 return true; 1541} 1542 1543bool RocksDBKVStore::getStatFromStatistics(const rocksdb::Tickers ticker, 1544 size_t& value) { 1545 const auto statistics = rdb->GetDBOptions().statistics; 1546 if (!statistics) { 1547 return false; 1548 } 1549 value = statistics->getTickerCount(ticker); 1550 return true; 1551} 1552 1553bool RocksDBKVStore::getStatFromProperties(ColumnFamily cf, 1554 const std::string& property, 1555 size_t& value) { 1556 value = 0; 1557 std::lock_guard<std::mutex> lg(vbhMutex); 1558 for (const auto vbh : vbHandles) { 1559 if (vbh) { 1560 rocksdb::ColumnFamilyHandle* cfh = nullptr; 1561 switch (cf) { 1562 case ColumnFamily::Default: 1563 cfh = vbh->defaultCFH.get(); 1564 break; 1565 case ColumnFamily::Seqno: 1566 cfh = vbh->seqnoCFH.get(); 1567 break; 1568 } 1569 if (!cfh) { 1570 return false; 1571 } 1572 std::string out; 1573 if (!rdb->GetProperty(cfh, property, &out)) { 1574 return false; 1575 } 1576 value += std::stoull(out); 1577 } 1578 } 1579 1580 return true; 1581} 1582 1583// As we implement a VBucket as a pair of two Column Families (a 'default' CF 1584// and a 'local+seqno' CF), we need to re-set the 'write_buffer_size' for each 1585// CF when the number of VBuckets managed by the current store changes. The 1586// goal is to keep the total allocation for all the Memtables under the 1587// 'rocksdb_memtables_ratio' given in configuration. 1588// Thus, this function performs the following basic steps: 1589// 1) Re-calculate the new sizes of all Memtables; 1590// 2) Apply the new sizes. 1591// We apply the new sizes using the rocksdb::DB::SetOptions() API. The 1592// 'write_buffer_size' is a dynamically changeable option. This call changes 1593// the size of mutable Memtables instantly. If the new size is below the 1594// current allocation for the Memtable, the next key-value pair added will mark 1595// the Memtable as immutable and will trigger a flush. 1596void RocksDBKVStore::applyMemtablesQuota( 1597 const std::lock_guard<std::mutex>& lock) { 1598 const auto vbuckets = getVBucketsCount(lock); 1599 1600 auto& configuration = 1601 dynamic_cast<RocksDBKVStoreConfig&>(this->configuration); 1602 1603 // 1) If configuration.getMemtablesRatio() == 0.0, then 1604 // we just want to use the baseline write_buffer_size. 1605 // 2) If vbuckets == 0, then there is no Memtable (this happens only 1606 // when the underlying RocksDB instance has just been created). 1607 // On both cases the following logic does not apply, so the 1608 // write_buffer_size for both the 'default' and the 'seqno' CFs is left 1609 // to the baseline value. 1610 if (configuration.getMemtablesRatio() > 0.0 && vbuckets > 0) { 1611 const auto memtablesQuota = configuration.getBucketQuota() / 1612 configuration.getMaxShards() * 1613 configuration.getMemtablesRatio(); 1614 // TODO: for now I am hard-coding the percentage of Memtables Quota 1615 // that we allocate for the 'deafult' (90%) and 'seqno' (10%) CFs. The 1616 // plan is to expose this percentage as a configuration parameter in a 1617 // follow-up patch. 1618 const auto defaultCFMemtablesQuota = memtablesQuota * 0.9; 1619 const auto seqnoCFMemtablesQuota = 1620 memtablesQuota - defaultCFMemtablesQuota; 1621 1622 // Set the the write_buffer_size for the 'default' CF 1623 defaultCFOptions.write_buffer_size = 1624 defaultCFMemtablesQuota / vbuckets / 1625 defaultCFOptions.max_write_buffer_number; 1626 // Set the write_buffer_size for the 'seqno' CF 1627 seqnoCFOptions.write_buffer_size = 1628 seqnoCFMemtablesQuota / vbuckets / 1629 seqnoCFOptions.max_write_buffer_number; 1630 1631 // Apply the new write_buffer_size 1632 const std::unordered_map<std::string, std::string> 1633 newDefaultCFWriteBufferSize{std::make_pair( 1634 "write_buffer_size", 1635 std::to_string(defaultCFOptions.write_buffer_size))}; 1636 const std::unordered_map<std::string, std::string> 1637 newSeqnoCFWriteBufferSize{std::make_pair( 1638 "write_buffer_size", 1639 std::to_string(seqnoCFOptions.write_buffer_size))}; 1640 for (const auto& vbh : vbHandles) { 1641 if (vbh) { 1642 auto status = rdb->SetOptions(vbh->defaultCFH.get(), 1643 newDefaultCFWriteBufferSize); 1644 if (!status.ok()) { 1645 throw std::runtime_error( 1646 "RocksDBKVStore::applyMemtablesQuota: SetOptions " 1647 "failed for [vbid: " + 1648 std::to_string(vbh->vbid) + ", CF: default]: " + 1649 status.getState()); 1650 } 1651 status = rdb->SetOptions(vbh->seqnoCFH.get(), 1652 newSeqnoCFWriteBufferSize); 1653 if (!status.ok()) { 1654 throw std::runtime_error( 1655 "RocksDBKVStore::applyMemtablesQuota: SetOptions " 1656 "failed for [vbid: " + 1657 std::to_string(vbh->vbid) + ", CF: seqno]: " + 1658 status.getState()); 1659 } 1660 } 1661 } 1662 } 1663 1664 // Overwrite Compaction options if Compaction Optimization is enabled 1665 // for the 'default' CF 1666 if (configuration.getDefaultCfOptimizeCompaction() == "level") { 1667 defaultCFOptions.OptimizeLevelStyleCompaction( 1668 defaultCFOptions.write_buffer_size); 1669 } else if (configuration.getDefaultCfOptimizeCompaction() == "universal") { 1670 defaultCFOptions.OptimizeUniversalStyleCompaction( 1671 defaultCFOptions.write_buffer_size); 1672 } 1673 // Overwrite Compaction options if Compaction Optimization is enabled 1674 // for the 'seqno' CF 1675 if (configuration.getSeqnoCfOptimizeCompaction() == "level") { 1676 seqnoCFOptions.OptimizeLevelStyleCompaction( 1677 seqnoCFOptions.write_buffer_size); 1678 } else if (configuration.getSeqnoCfOptimizeCompaction() == "universal") { 1679 seqnoCFOptions.OptimizeUniversalStyleCompaction( 1680 seqnoCFOptions.write_buffer_size); 1681 } 1682} 1683 1684size_t RocksDBKVStore::getVBucketsCount( 1685 const std::lock_guard<std::mutex>&) const { 1686 uint16_t count = 0; 1687 for (const auto& vbh : vbHandles) { 1688 if (vbh) { 1689 count++; 1690 } 1691 } 1692 return count; 1693} 1694