1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2017 Couchbase, Inc.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include "rocksdb-kvstore.h"
21 #include "rocksdb-kvstore_config.h"
22
23 #include "ep_time.h"
24
25 #include "kvstore_priv.h"
26
27 #include <phosphor/phosphor.h>
28 #include <platform/sysinfo.h>
29 #include <rocksdb/convenience.h>
30 #include <rocksdb/filter_policy.h>
31
32 #include <stdio.h>
33 #include <string.h>
34 #include <algorithm>
35 #include <gsl/gsl>
36 #include <limits>
37 #include <thread>
38
39 #include "vbucket.h"
40
41 namespace rockskv {
42 // MetaData is used to serialize and de-serialize metadata respectively when
43 // writing a Document mutation request to RocksDB and when reading a Document
44 // from RocksDB.
45 class MetaData {
46 public:
MetaData()47 MetaData()
48 : deleted(0),
49 version(0),
50 datatype(0),
51 flags(0),
52 valueSize(0),
53 exptime(0),
54 cas(0),
55 revSeqno(0),
56 bySeqno(0){};
MetaData(bool deleted, uint8_t version, uint8_t datatype, uint32_t flags, uint32_t valueSize, time_t exptime, uint64_t cas, uint64_t revSeqno, int64_t bySeqno)57 MetaData(bool deleted,
58 uint8_t version,
59 uint8_t datatype,
60 uint32_t flags,
61 uint32_t valueSize,
62 time_t exptime,
63 uint64_t cas,
64 uint64_t revSeqno,
65 int64_t bySeqno)
66 : deleted(deleted),
67 version(version),
68 datatype(datatype),
69 flags(flags),
70 valueSize(valueSize),
71 exptime(exptime),
72 cas(cas),
73 revSeqno(revSeqno),
74 bySeqno(bySeqno){};
75
76 // The `#pragma pack(1)` directive and the order of members are to keep
77 // the size of MetaData as small as possible and uniform across different
78 // platforms.
79 #pragma pack(1)
80 uint8_t deleted : 1;
81 uint8_t version : 7;
82 uint8_t datatype;
83 uint32_t flags;
84 uint32_t valueSize;
85 time_t exptime;
86 uint64_t cas;
87 uint64_t revSeqno;
88 int64_t bySeqno;
89 #pragma pack()
90 };
91 } // namespace rockskv
92
93 /**
94 * Class representing a document to be persisted in RocksDB.
95 */
96 class RocksRequest : public IORequest {
97 public:
98 /**
99 * Constructor
100 *
101 * @param item Item instance to be persisted
102 * @param callback Persistence Callback
103 * @param del Flag indicating if it is an item deletion or not
104 */
RocksRequest(const Item& item, MutationRequestCallback& callback)105 RocksRequest(const Item& item, MutationRequestCallback& callback)
106 : IORequest(item.getVBucketId(),
107 callback,
108 item.isDeleted(),
109 item.getKey()),
110 docBody(item.getValue()) {
111 docMeta = rockskv::MetaData(
112 item.isDeleted(),
113 0,
114 item.getDataType(),
115 item.getFlags(),
116 item.getNBytes(),
117 item.isDeleted() ? ep_real_time() : item.getExptime(),
118 item.getCas(),
119 item.getRevSeqno(),
120 item.getBySeqno());
121 }
122
getDocMeta()123 const rockskv::MetaData& getDocMeta() {
124 return docMeta;
125 }
126
127 // Get a rocksdb::Slice wrapping the Document MetaData
getDocMetaSlice()128 rocksdb::Slice getDocMetaSlice() {
129 return rocksdb::Slice(reinterpret_cast<char*>(&docMeta),
130 sizeof(docMeta));
131 }
132
133 // Get a rocksdb::Slice wrapping the Document Body
getDocBodySlice()134 rocksdb::Slice getDocBodySlice() {
135 const char* data = docBody ? docBody->getData() : nullptr;
136 size_t size = docBody ? docBody->valueSize() : 0;
137 return rocksdb::Slice(data, size);
138 }
139
140 private:
141 rockskv::MetaData docMeta;
142 value_t docBody;
143 };
144
145 // RocksDB docs suggest to "Use `rocksdb::DB::DestroyColumnFamilyHandle()` to
146 // close a column family instead of deleting the column family handle directly"
147 struct ColumnFamilyDeleter {
ColumnFamilyDeleterColumnFamilyDeleter148 ColumnFamilyDeleter(rocksdb::DB& db) : db(db) {
149 }
operator ()ColumnFamilyDeleter150 void operator()(rocksdb::ColumnFamilyHandle* cfh) {
151 db.DestroyColumnFamilyHandle(cfh);
152 }
153
154 private:
155 rocksdb::DB& db;
156 };
157 using ColumnFamilyPtr =
158 std::unique_ptr<rocksdb::ColumnFamilyHandle, ColumnFamilyDeleter>;
159
160 // The 'VBHandle' class is a wrapper around the ColumnFamilyHandles
161 // for a VBucket.
162 class VBHandle {
163 public:
VBHandle(rocksdb::DB& rdb, rocksdb::ColumnFamilyHandle* defaultCFH, rocksdb::ColumnFamilyHandle* seqnoCFH, uint16_t vbid)164 VBHandle(rocksdb::DB& rdb,
165 rocksdb::ColumnFamilyHandle* defaultCFH,
166 rocksdb::ColumnFamilyHandle* seqnoCFH,
167 uint16_t vbid)
168 : rdb(rdb),
169 defaultCFH(ColumnFamilyPtr(defaultCFH, rdb)),
170 seqnoCFH(ColumnFamilyPtr(seqnoCFH, rdb)),
171 vbid(vbid) {
172 }
173
dropColumnFamilies()174 void dropColumnFamilies() {
175 // The call to DropColumnFamily() records the drop in the Manifest, but
176 // the actual remove will happen when the ColumFamilyHandle is deleted.
177 auto status = rdb.DropColumnFamily(defaultCFH.get());
178 if (!status.ok()) {
179 throw std::runtime_error(
180 "VBHandle::dropColumnFamilies: DropColumnFamily failed for "
181 "[vbid: " +
182 std::to_string(vbid) + ", CF: default]: " +
183 status.getState());
184 }
185 status = rdb.DropColumnFamily(seqnoCFH.get());
186 if (!status.ok()) {
187 throw std::runtime_error(
188 "VBHandle::dropColumnFamilies: DropColumnFamily failed for "
189 "[vbid: " +
190 std::to_string(vbid) + ", CF: seqno]: " +
191 status.getState());
192 }
193 }
194
195 rocksdb::DB& rdb;
196 const ColumnFamilyPtr defaultCFH;
197 const ColumnFamilyPtr seqnoCFH;
198 const uint16_t vbid;
199 };
200
RocksDBKVStore(RocksDBKVStoreConfig& configuration)201 RocksDBKVStore::RocksDBKVStore(RocksDBKVStoreConfig& configuration)
202 : KVStore(configuration),
203 vbHandles(configuration.getMaxVBuckets()),
204 in_transaction(false),
205 scanCounter(0),
206 logger(configuration.getLogger()) {
207 cachedVBStates.resize(configuration.getMaxVBuckets());
208 writeOptions.sync = true;
209
210 // The RocksDB Options is a set of DBOptions and ColumnFamilyOptions.
211 // Together they cover all RocksDB available parameters.
212 auto status = rocksdb::GetDBOptionsFromString(
213 dbOptions, configuration.getDBOptions(), &dbOptions);
214 if (!status.ok()) {
215 throw std::invalid_argument(
216 std::string("RocksDBKVStore::open: GetDBOptionsFromString "
217 "error: ") +
218 status.getState());
219 }
220
221 // Set number of background threads - note these are per-environment, so
222 // are shared across all DB instances (vBuckets) and all Buckets.
223 auto lowPri = configuration.getLowPriBackgroundThreads();
224 if (lowPri == 0) {
225 lowPri = cb::get_available_cpu_count();
226 }
227 rocksdb::Env::Default()->SetBackgroundThreads(lowPri, rocksdb::Env::LOW);
228
229 auto highPri = configuration.getHighPriBackgroundThreads();
230 if (highPri == 0) {
231 highPri = cb::get_available_cpu_count();
232 }
233 rocksdb::Env::Default()->SetBackgroundThreads(highPri, rocksdb::Env::HIGH);
234
235 dbOptions.create_if_missing = true;
236
237 // We use EventListener to set the correct ThreadLocal engine in the
238 // ObjectRegistry for the RocksDB Flusher and Compactor threads. This
239 // allows the memory tracker to track allocations and deallocations against
240 // the appropriate bucket.
241 auto eventListener =
242 std::make_shared<EventListener>(ObjectRegistry::getCurrentEngine());
243 dbOptions.listeners.emplace_back(eventListener);
244
245 // Enable Statistics if 'Statistics::stat_level_' is provided by the
246 // configuration. We create a statistics object and pass to the multiple
247 // DBs managed by the same KVStore. Then the statistics object will contain
248 // aggregated values for all those DBs. Note that some stats are undefined
249 // and have no meaningful information across multiple DBs (e.g.,
250 // "rocksdb.sequence.number").
251 if (!configuration.getStatsLevel().empty()) {
252 dbOptions.statistics = rocksdb::CreateDBStatistics();
253 dbOptions.statistics->stats_level_ =
254 getStatsLevel(configuration.getStatsLevel());
255 }
256
257 // Apply the environment rate limit for Flush and Compaction
258 dbOptions.rate_limiter = configuration.getEnvRateLimiter();
259
260 // Allocate the per-shard Block Cache
261 if (configuration.getBlockCacheRatio() > 0.0) {
262 auto blockCacheQuota = configuration.getBucketQuota() *
263 configuration.getBlockCacheRatio();
264 // Keeping default settings for:
265 // num_shard_bits = -1 (automatically determined)
266 // strict_capacity_limit = false (do not fail insert when cache is full)
267 blockCache = rocksdb::NewLRUCache(
268 blockCacheQuota / configuration.getMaxShards(),
269 -1 /*num_shard_bits*/,
270 false /*strict_capacity_limit*/,
271 configuration.getBlockCacheHighPriPoolRatio());
272 }
273 // Configure all the Column Families
274 const auto& cfOptions = configuration.getCFOptions();
275 const auto& bbtOptions = configuration.getBBTOptions();
276 defaultCFOptions = getBaselineDefaultCFOptions();
277 seqnoCFOptions = getBaselineSeqnoCFOptions();
278 applyUserCFOptions(defaultCFOptions, cfOptions, bbtOptions);
279 applyUserCFOptions(seqnoCFOptions, cfOptions, bbtOptions);
280
281 // Open the DB and load the ColumnFamilyHandle for all the
282 // existing Column Families (populates the 'vbHandles' vector)
283 openDB();
284
285 // Calculate and apply the correct write_buffer_size for all Column
286 // Families. The Memtable size of each CF depends on the count of existing
287 // CFs in DB (besides other things). Thus, this must be called after
288 // 'openDB' (so that all the existing CFs have been loaded).
289 applyMemtablesQuota(std::lock_guard<std::mutex>(vbhMutex));
290
291 // Read persisted VBs state
292 for (const auto vbh : vbHandles) {
293 if (vbh) {
294 readVBState(*vbh);
295 // Update stats
296 ++st.numLoadedVb;
297 }
298 }
299 }
300
~RocksDBKVStore()301 RocksDBKVStore::~RocksDBKVStore() {
302 // Guarantees that all the ColumnFamilyHandles for the existing VBuckets
303 // are released before 'rdb' is deleted. From RocksDB docs:
304 // "Before delete DB, you have to close All column families by calling
305 // DestroyColumnFamilyHandle() with all the handles."
306 vbHandles.clear();
307 in_transaction = false;
308 }
309
openDB()310 void RocksDBKVStore::openDB() {
311 auto dbname = getDBSubdir();
312 createDataDir(dbname);
313
314 std::vector<std::string> cfs;
315 auto status = rocksdb::DB::ListColumnFamilies(dbOptions, dbname, &cfs);
316 if (!status.ok()) {
317 // If ListColumnFamilies failed because the DB does not exist,
318 // then it will be created the first time we call 'rocksdb::DB::Open'.
319 // Else, we throw an error if ListColumnFamilies failed for any other
320 // unexpected reason.
321 if (!(status.code() == rocksdb::Status::kIOError &&
322 std::string(status.getState())
323 .find("No such file or directory") !=
324 std::string::npos)) {
325 throw std::runtime_error(
326 "RocksDBKVStore::openDB: ListColumnFamilies failed for DB "
327 "'" +
328 dbname + "': " + status.getState());
329 }
330 }
331
332 // We need to pass a ColumnFamilyDescriptor for every existing CF.
333 // We populate 'cfDescriptors' so that it results in a vector
334 // containing packed CFs for every VBuckets, e.g. with
335 // MaxShards=4:
336 // cfDescriptors[0] = default_0
337 // cfDescriptors[1] = seqno_0
338 // cfDescriptors[2] = default_4
339 // cfDescriptors[3] = seqno_4
340 // ..
341 // That helps us in populating 'vbHandles' later, because after
342 // 'rocksdb::DB::Open' handles[i] will be the handle that we will use
343 // to operate on the ColumnFamily at cfDescriptors[i].
344 std::vector<rocksdb::ColumnFamilyDescriptor> cfDescriptors;
345 for (uint16_t vbid = 0; vbid < configuration.getMaxVBuckets(); vbid++) {
346 if ((vbid % configuration.getMaxShards()) ==
347 configuration.getShardId()) {
348 std::string defaultCF = "default_" + std::to_string(vbid);
349 std::string seqnoCF = "local+seqno_" + std::to_string(vbid);
350 if (std::find(cfs.begin(), cfs.end(), defaultCF) != cfs.end()) {
351 if (std::find(cfs.begin(), cfs.end(), seqnoCF) == cfs.end()) {
352 throw std::logic_error("RocksDBKVStore::openDB: DB '" +
353 dbname +
354 "' is in inconsistent state: CF " +
355 seqnoCF + " not found.");
356 }
357 cfDescriptors.emplace_back(defaultCF, defaultCFOptions);
358 cfDescriptors.emplace_back(seqnoCF, seqnoCFOptions);
359 }
360 }
361 }
362
363 // TODO: The RocksDB built-in 'default' CF always exists, need to check if
364 // we can drop it.
365 cfDescriptors.emplace_back(rocksdb::kDefaultColumnFamilyName,
366 rocksdb::ColumnFamilyOptions());
367 std::vector<rocksdb::ColumnFamilyHandle*> handles;
368 rocksdb::DB* db;
369 status = rocksdb::DB::Open(dbOptions, dbname, cfDescriptors, &handles, &db);
370 if (!status.ok()) {
371 throw std::runtime_error(
372 "RocksDBKVStore::openDB: Open failed for database '" + dbname +
373 "': " + status.getState());
374 }
375 rdb.reset(db);
376
377 // The way we populated 'cfDescriptors' guarantees that: if 'cfDescriptors'
378 // contains more than only the RocksDB 'default' CF (i.e.,
379 // '(cfDescriptors.size() - 1) > 0') then 'cfDescriptors[i]' and
380 // 'cfDescriptors[i+1]' are respectively the 'default_' and 'seqno'_ CFs
381 // for a certain VBucket.
382 for (uint16_t i = 0; i < (cfDescriptors.size() - 1); i += 2) {
383 // Note: any further sanity-check is redundant as we will have always
384 // 'cf = "default_<vbid>"'.
385 const auto& cf = cfDescriptors[i].name;
386 uint16_t vbid = std::stoi(cf.substr(8));
387 vbHandles[vbid] = std::make_shared<VBHandle>(
388 *rdb, handles[i], handles[i + 1], vbid);
389 }
390
391 // We need to release the ColumnFamilyHandle for the built-in 'default' CF
392 // here, as it is not managed by any VBHandle.
393 rdb->DestroyColumnFamilyHandle(handles.back());
394 }
395
getVBHandle(uint16_t vbid)396 std::shared_ptr<VBHandle> RocksDBKVStore::getVBHandle(uint16_t vbid) {
397 std::lock_guard<std::mutex> lg(vbhMutex);
398 if (vbHandles[vbid]) {
399 return vbHandles[vbid];
400 }
401
402 // If the VBHandle for vbid does not exist it means that we need to create
403 // the VBucket, i.e. we need to create the set of CFs on DB for vbid
404 std::vector<rocksdb::ColumnFamilyDescriptor> cfDescriptors;
405 auto vbid_ = std::to_string(vbid);
406 cfDescriptors.emplace_back("default_" + vbid_, defaultCFOptions);
407 cfDescriptors.emplace_back("local+seqno_" + vbid_, seqnoCFOptions);
408
409 std::vector<rocksdb::ColumnFamilyHandle*> handles;
410 auto status = rdb->CreateColumnFamilies(cfDescriptors, &handles);
411 if (!status.ok()) {
412 for (auto* cfh : handles) {
413 status = rdb->DropColumnFamily(cfh);
414 if (!status.ok()) {
415 throw std::runtime_error(
416 "RocksDBKVStore::getVBHandle: DropColumnFamily failed "
417 "for CF " +
418 cfh->GetName() + ": " + status.getState());
419 }
420 }
421 throw std::runtime_error(
422 "RocksDBKVStore::getVBHandle: CreateColumnFamilies failed for "
423 "vbid " +
424 std::to_string(vbid) + ": " + status.getState());
425 }
426
427 vbHandles[vbid] =
428 std::make_shared<VBHandle>(*rdb, handles[0], handles[1], vbid);
429
430 // The number of VBuckets has increased, we need to re-balance the
431 // Memtables Quota among the CFs of existing VBuckets.
432 applyMemtablesQuota(lg);
433
434 return vbHandles[vbid];
435 }
436
getDBSubdir()437 std::string RocksDBKVStore::getDBSubdir() {
438 return configuration.getDBName() + "/rocksdb." +
439 std::to_string(configuration.getShardId());
440 }
441
begin(std::unique_ptr<TransactionContext> txCtx)442 bool RocksDBKVStore::begin(std::unique_ptr<TransactionContext> txCtx) {
443 if (!txCtx) {
444 throw std::invalid_argument("RocksDBKVStore::begin: txCtx is null");
445 }
446 in_transaction = true;
447 transactionCtx = std::move(txCtx);
448 return in_transaction;
449 }
450
commit(const Item* collectionsManifest)451 bool RocksDBKVStore::commit(const Item* collectionsManifest) {
452 // This behaviour is to replicate the one in Couchstore.
453 // If `commit` is called when not in transaction, just return true.
454 if (!in_transaction) {
455 return true;
456 }
457
458 if (pendingReqs.size() == 0) {
459 in_transaction = false;
460 return true;
461 }
462
463 // Swap `pendingReqs` with the temporary `commitBatch` so that we can
464 // shorten the scope of the lock.
465 std::vector<std::unique_ptr<RocksRequest>> commitBatch;
466 {
467 std::lock_guard<std::mutex> lock(writeMutex);
468 std::swap(pendingReqs, commitBatch);
469 }
470
471 bool success = true;
472 auto vbid = commitBatch[0]->getVBucketId();
473
474 // Flush all documents to disk
475 auto status = saveDocs(vbid, collectionsManifest, commitBatch);
476 if (!status.ok()) {
477 logger.log(EXTENSION_LOG_WARNING,
478 "RocksDBKVStore::commit: saveDocs error:%d, "
479 "vb:%" PRIu16,
480 status.code(),
481 vbid);
482 success = false;
483 }
484
485 commitCallback(status, commitBatch);
486
487 // This behaviour is to replicate the one in Couchstore.
488 // Set `in_transanction = false` only if `commit` is successful.
489 if (success) {
490 in_transaction = false;
491 transactionCtx.reset();
492 }
493
494 return success;
495 }
496
getMutationStatus(rocksdb::Status status)497 static int getMutationStatus(rocksdb::Status status) {
498 switch (status.code()) {
499 case rocksdb::Status::Code::kOk:
500 return MUTATION_SUCCESS;
501 case rocksdb::Status::Code::kNotFound:
502 // This return value causes ep-engine to drop the failed flush
503 return DOC_NOT_FOUND;
504 case rocksdb::Status::Code::kBusy:
505 // This return value causes ep-engine to keep re-queueing the failed
506 // flush
507 return MUTATION_FAILED;
508 default:
509 throw std::runtime_error(
510 std::string("getMutationStatus: RocksDB error:") +
511 std::string(status.getState()));
512 }
513 }
514
commitCallback( rocksdb::Status status, const std::vector<std::unique_ptr<RocksRequest>>& commitBatch)515 void RocksDBKVStore::commitCallback(
516 rocksdb::Status status,
517 const std::vector<std::unique_ptr<RocksRequest>>& commitBatch) {
518 for (const auto& request : commitBatch) {
519 auto dataSize = request->getDocMetaSlice().size() +
520 request->getDocBodySlice().size();
521 const auto& key = request->getKey();
522 /* update ep stats */
523 ++st.io_num_write;
524 st.io_write_bytes += (key.size() + dataSize);
525
526 auto rv = getMutationStatus(status);
527 if (request->isDelete()) {
528 if (status.code()) {
529 ++st.numDelFailure;
530 } else {
531 st.delTimeHisto.add(request->getDelta() / 1000);
532 }
533 if (rv != -1) {
534 // TODO: Should set `rv` to 1 or 0 depending on if this is a
535 // delete to an existing (1) or non-existing (0) item. However,
536 // to achieve this we would need to perform a Get to RocksDB
537 // which is costly. For now just assume that the item did exist.
538 rv = 1;
539 }
540 request->getDelCallback()->callback(*transactionCtx, rv);
541 } else {
542 if (status.code()) {
543 ++st.numSetFailure;
544 } else {
545 st.writeTimeHisto.add(request->getDelta() / 1000);
546 st.writeSizeHisto.add(dataSize + key.size());
547 }
548 // TODO: Should set `mr.second` to true or false depending on if
549 // this is an insertion (true) or an update of an existing item
550 // (false). However, to achieve this we would need to perform a Get
551 // to RocksDB which is costly. For now just assume that the item
552 // did not exist.
553 mutation_result mr = std::make_pair(1, true);
554 request->getSetCallback()->callback(*transactionCtx, mr);
555 }
556 }
557 }
558
rollback()559 void RocksDBKVStore::rollback() {
560 if (in_transaction) {
561 in_transaction = false;
562 transactionCtx.reset();
563 }
564 }
565
listPersistedVbuckets()566 std::vector<vbucket_state*> RocksDBKVStore::listPersistedVbuckets() {
567 std::vector<vbucket_state*> result;
568 for (const auto& vb : cachedVBStates) {
569 result.emplace_back(vb.get());
570 }
571 return result;
572 }
573
set(const Item& item, Callback<TransactionContext, mutation_result>& cb)574 void RocksDBKVStore::set(const Item& item,
575 Callback<TransactionContext, mutation_result>& cb) {
576 if (!in_transaction) {
577 throw std::logic_error(
578 "RocksDBKVStore::set: in_transaction must be true to perform a "
579 "set operation.");
580 }
581 MutationRequestCallback callback;
582 callback.setCb = &cb;
583 pendingReqs.push_back(std::make_unique<RocksRequest>(item, callback));
584 }
585
get(const StoredDocKey& key, uint16_t vb, bool fetchDelete)586 GetValue RocksDBKVStore::get(const StoredDocKey& key,
587 uint16_t vb,
588 bool fetchDelete) {
589 return getWithHeader(nullptr, key, vb, GetMetaOnly::No, fetchDelete);
590 }
591
getWithHeader(void* dbHandle, const StoredDocKey& key, uint16_t vb, GetMetaOnly getMetaOnly, bool fetchDelete)592 GetValue RocksDBKVStore::getWithHeader(void* dbHandle,
593 const StoredDocKey& key,
594 uint16_t vb,
595 GetMetaOnly getMetaOnly,
596 bool fetchDelete) {
597 std::string value;
598 const auto vbh = getVBHandle(vb);
599 // TODO RDB: use a PinnableSlice to avoid some memcpy
600 rocksdb::Slice keySlice = getKeySlice(key);
601 rocksdb::Status s = rdb->Get(
602 rocksdb::ReadOptions(), vbh->defaultCFH.get(), keySlice, &value);
603 if (!s.ok()) {
604 return GetValue{NULL, ENGINE_KEY_ENOENT};
605 }
606 return makeGetValue(vb, key, value, getMetaOnly);
607 }
608
getMulti(uint16_t vb, vb_bgfetch_queue_t& itms)609 void RocksDBKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t& itms) {
610 // TODO RDB: RocksDB supports a multi get which we should use here.
611 for (auto& it : itms) {
612 auto& key = it.first;
613 rocksdb::Slice keySlice = getKeySlice(key);
614 std::string value;
615 const auto vbh = getVBHandle(vb);
616 rocksdb::Status s = rdb->Get(rocksdb::ReadOptions(),
617 vbh->defaultCFH.get(),
618 keySlice,
619 &value);
620 if (s.ok()) {
621 it.second.value =
622 makeGetValue(vb, key, value, it.second.isMetaOnly);
623 GetValue* rv = &it.second.value;
624 for (auto& fetch : it.second.bgfetched_list) {
625 fetch->value = rv;
626 }
627 } else {
628 for (auto& fetch : it.second.bgfetched_list) {
629 fetch->value->setStatus(ENGINE_KEY_ENOENT);
630 }
631 }
632 }
633 }
634
reset(uint16_t vbucketId)635 void RocksDBKVStore::reset(uint16_t vbucketId) {
636 // TODO RDB: Implement.
637 }
638
del(const Item& item, Callback<TransactionContext, int>& cb)639 void RocksDBKVStore::del(const Item& item,
640 Callback<TransactionContext, int>& cb) {
641 if (!item.isDeleted()) {
642 throw std::invalid_argument(
643 "RocksDBKVStore::del item to delete is not marked as deleted.");
644 }
645 if (!in_transaction) {
646 throw std::logic_error(
647 "RocksDBKVStore::del: in_transaction must be true to perform a "
648 "delete operation.");
649 }
650 // TODO: Deleted items remain as tombstones, but are not yet expired,
651 // they will accumuate forever.
652 MutationRequestCallback callback;
653 callback.delCb = &cb;
654 pendingReqs.push_back(std::make_unique<RocksRequest>(item, callback));
655 }
656
delVBucket(uint16_t vbid, uint64_t vb_version)657 void RocksDBKVStore::delVBucket(uint16_t vbid, uint64_t vb_version) {
658 std::lock_guard<std::mutex> lg1(writeMutex);
659 std::lock_guard<std::mutex> lg2(vbhMutex);
660
661 if (!vbHandles[vbid]) {
662 logger.log(EXTENSION_LOG_WARNING,
663 "RocksDBKVStore::delVBucket: VBucket not found, vb:%" PRIu16,
664 vbid);
665 return;
666 }
667
668 // 'vbHandles' stores a shared_ptr to VBHandle for each VBucket . The
669 // ownership of each pointer is shared among multiple threads performing
670 // different operations (e.g., 'get' and 'commit').
671 // We want to call 'DropColumnFamily' here rather than in other threads
672 // because it is an expensive, IO-intensive operation and we do not want
673 // it to cause another thread (possibly a front-end one) from being blocked
674 // performing the drop.
675 // So, the thread executing 'delVBucket' spins until it is the exclusive
676 // owner of the shared_ptr (i.e., other concurrent threads like 'commit'
677 // have completed and do not own any copy of the shared_ptr).
678 {
679 std::shared_ptr<VBHandle> sharedPtr;
680 std::swap(vbHandles[vbid], sharedPtr);
681 while (!sharedPtr.unique()) {
682 std::this_thread::sleep_for(std::chrono::microseconds(100));
683 }
684 // Drop all the CF for vbid.
685 sharedPtr->dropColumnFamilies();
686 }
687
688 // The number of VBuckets has decreased, we need to re-balance the
689 // Memtables Quota among the CFs of existing VBuckets.
690 applyMemtablesQuota(lg2);
691 }
692
snapshotVBucket(uint16_t vbucketId, const vbucket_state& vbstate, VBStatePersist options)693 bool RocksDBKVStore::snapshotVBucket(uint16_t vbucketId,
694 const vbucket_state& vbstate,
695 VBStatePersist options) {
696 // TODO RDB: Refactor out behaviour common to this and CouchKVStore
697 auto start = ProcessClock::now();
698
699 if (updateCachedVBState(vbucketId, vbstate) &&
700 (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT ||
701 options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT)) {
702 const auto vbh = getVBHandle(vbucketId);
703 rocksdb::WriteBatch batch;
704 auto status = saveVBStateToBatch(*vbh, vbstate, batch);
705 if (!status.ok()) {
706 logger.log(EXTENSION_LOG_WARNING,
707 "RocksDBKVStore::snapshotVBucket: saveVBStateToBatch() "
708 "failed state:%s vb:%" PRIu16 " :%s",
709 VBucket::toString(vbstate.state),
710 vbucketId,
711 status.getState());
712 return false;
713 }
714 status = rdb->Write(writeOptions, &batch);
715 if (!status.ok()) {
716 logger.log(EXTENSION_LOG_WARNING,
717 "RocksDBKVStore::snapshotVBucket: Write() "
718 "failed state:%s vb:%" PRIu16 " :%s",
719 VBucket::toString(vbstate.state),
720 vbucketId,
721 status.getState());
722 return false;
723 }
724 }
725
726 LOG(EXTENSION_LOG_DEBUG,
727 "RocksDBKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16
728 " state:%s",
729 vbucketId,
730 vbstate.toJSON().c_str());
731
732 st.snapshotHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
733 ProcessClock::now() - start));
734
735 return true;
736 }
737
snapshotStats(const std::map<std::string, std::string>&)738 bool RocksDBKVStore::snapshotStats(const std::map<std::string, std::string>&) {
739 // TODO RDB: Implement
740 return true;
741 }
742
destroyInvalidVBuckets(bool)743 void RocksDBKVStore::destroyInvalidVBuckets(bool) {
744 // TODO RDB: implement
745 }
746
getNumShards()747 size_t RocksDBKVStore::getNumShards() {
748 return configuration.getMaxShards();
749 }
750
getStat(const char* name_, size_t& value)751 bool RocksDBKVStore::getStat(const char* name_, size_t& value) {
752 std::string name(name_);
753
754 // Memory Usage
755 if (name == "kMemTableTotal") {
756 return getStatFromMemUsage(rocksdb::MemoryUtil::kMemTableTotal, value);
757 } else if (name == "kMemTableUnFlushed") {
758 return getStatFromMemUsage(rocksdb::MemoryUtil::kMemTableUnFlushed,
759 value);
760 } else if (name == "kTableReadersTotal") {
761 return getStatFromMemUsage(rocksdb::MemoryUtil::kTableReadersTotal,
762 value);
763 } else if (name == "kCacheTotal") {
764 return getStatFromMemUsage(rocksdb::MemoryUtil::kCacheTotal, value);
765 }
766
767 // MemTable Size per Column Famiy
768 else if (name == "default_kSizeAllMemTables") {
769 return getStatFromProperties(ColumnFamily::Default,
770 rocksdb::DB::Properties::kSizeAllMemTables,
771 value);
772 } else if (name == "seqno_kSizeAllMemTables") {
773 return getStatFromProperties(ColumnFamily::Seqno,
774 rocksdb::DB::Properties::kSizeAllMemTables,
775 value);
776 }
777
778 // Block Cache hit/miss
779 else if (name == "rocksdb.block.cache.hit") {
780 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_HIT, value);
781 } else if (name == "rocksdb.block.cache.miss") {
782 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_MISS, value);
783 } else if (name == "rocksdb.block.cache.data.hit") {
784 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_DATA_HIT,
785 value);
786 } else if (name == "rocksdb.block.cache.data.miss") {
787 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_DATA_MISS,
788 value);
789 } else if (name == "rocksdb.block.cache.index.hit") {
790 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_INDEX_HIT,
791 value);
792 } else if (name == "rocksdb.block.cache.index.miss") {
793 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_INDEX_MISS,
794 value);
795 } else if (name == "rocksdb.block.cache.filter.hit") {
796 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_FILTER_HIT,
797 value);
798 } else if (name == "rocksdb.block.cache.filter.miss") {
799 return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_FILTER_MISS,
800 value);
801 }
802
803 // Disk Usage per Column Family
804 else if (name == "default_kTotalSstFilesSize") {
805 return getStatFromProperties(
806 ColumnFamily::Default,
807 rocksdb::DB::Properties::kTotalSstFilesSize,
808 value);
809 } else if (name == "seqno_kTotalSstFilesSize") {
810 return getStatFromProperties(
811 ColumnFamily::Seqno,
812 rocksdb::DB::Properties::kTotalSstFilesSize,
813 value);
814 }
815
816 // Scan stats
817 else if (name == "scan_totalSeqnoHits") {
818 value = scanTotalSeqnoHits.load();
819 return true;
820 } else if (name == "scan_oldSeqnoHits") {
821 value = scanOldSeqnoHits.load();
822 return true;
823 }
824
825 return false;
826 }
827
getStorageProperties(void)828 StorageProperties RocksDBKVStore::getStorageProperties(void) {
829 StorageProperties rv(StorageProperties::EfficientVBDump::Yes,
830 StorageProperties::EfficientVBDeletion::Yes,
831 StorageProperties::PersistedDeletion::No,
832 // TODO RDB: Not strictly true, multiGet
833 // does not yet use the underlying multi get
834 // of RocksDB
835 StorageProperties::EfficientGet::Yes,
836 StorageProperties::ConcurrentWriteCompact::Yes);
837 return rv;
838 }
839
getCachePointers()840 std::unordered_set<const rocksdb::Cache*> RocksDBKVStore::getCachePointers() {
841 std::unordered_set<const rocksdb::Cache*> cache_set;
842
843 // TODO: Cache from DBImpl. The 'std::shared_ptr<Cache>
844 // table_cache_' pointer is not exposed through the 'DB' interface
845
846 // Cache from DBOptions
847 // Note: we do not use the 'row_cache' currently.
848 cache_set.insert(rdb->GetDBOptions().row_cache.get());
849
850 // Cache from table factories.
851 addCFBlockCachePointers(defaultCFOptions, cache_set);
852 addCFBlockCachePointers(seqnoCFOptions, cache_set);
853
854 return cache_set;
855 }
856
addCFBlockCachePointers( const rocksdb::ColumnFamilyOptions& cfOptions, std::unordered_set<const rocksdb::Cache*>& cache_set)857 void RocksDBKVStore::addCFBlockCachePointers(
858 const rocksdb::ColumnFamilyOptions& cfOptions,
859 std::unordered_set<const rocksdb::Cache*>& cache_set) {
860 if (cfOptions.table_factory) {
861 auto* table_options = cfOptions.table_factory->GetOptions();
862 auto* bbt_options =
863 static_cast<rocksdb::BlockBasedTableOptions*>(table_options);
864 cache_set.insert(bbt_options->block_cache.get());
865 cache_set.insert(bbt_options->block_cache_compressed.get());
866 }
867 }
868
getStatsLevel( const std::string& stats_level)869 rocksdb::StatsLevel RocksDBKVStore::getStatsLevel(
870 const std::string& stats_level) {
871 if (stats_level == "kExceptDetailedTimers") {
872 return rocksdb::StatsLevel::kExceptDetailedTimers;
873 } else if (stats_level == "kExceptTimeForMutex") {
874 return rocksdb::StatsLevel::kExceptTimeForMutex;
875 } else if (stats_level == "kAll") {
876 return rocksdb::StatsLevel::kAll;
877 } else {
878 throw std::invalid_argument(
879 std::string("RocksDBKVStore::getStatsLevel: stats_level: '") +
880 stats_level + std::string("'"));
881 }
882 }
883
getKeySlice(const DocKey& key)884 rocksdb::Slice RocksDBKVStore::getKeySlice(const DocKey& key) {
885 return rocksdb::Slice(reinterpret_cast<const char*>(key.data()),
886 key.size());
887 }
888
getSeqnoSlice(const int64_t* seqno)889 rocksdb::Slice RocksDBKVStore::getSeqnoSlice(const int64_t* seqno) {
890 return rocksdb::Slice(reinterpret_cast<const char*>(seqno), sizeof(*seqno));
891 }
892
getNumericSeqno(const rocksdb::Slice& seqnoSlice)893 int64_t RocksDBKVStore::getNumericSeqno(const rocksdb::Slice& seqnoSlice) {
894 assert(seqnoSlice.size() == sizeof(int64_t));
895 int64_t seqno;
896 std::memcpy(&seqno, seqnoSlice.data(), seqnoSlice.size());
897 return seqno;
898 }
899
makeItem(uint16_t vb, const DocKey& key, const rocksdb::Slice& s, GetMetaOnly getMetaOnly)900 std::unique_ptr<Item> RocksDBKVStore::makeItem(uint16_t vb,
901 const DocKey& key,
902 const rocksdb::Slice& s,
903 GetMetaOnly getMetaOnly) {
904 assert(s.size() >= sizeof(rockskv::MetaData));
905
906 const char* data = s.data();
907
908 rockskv::MetaData meta;
909 std::memcpy(&meta, data, sizeof(meta));
910 data += sizeof(meta);
911
912 bool includeValue = getMetaOnly == GetMetaOnly::No && meta.valueSize;
913
914 auto item = std::make_unique<Item>(key,
915 meta.flags,
916 meta.exptime,
917 includeValue ? data : nullptr,
918 includeValue ? meta.valueSize : 0,
919 meta.datatype,
920 meta.cas,
921 meta.bySeqno,
922 vb,
923 meta.revSeqno);
924
925 if (meta.deleted) {
926 item->setDeleted();
927 }
928
929 return item;
930 }
931
makeGetValue(uint16_t vb, const DocKey& key, const std::string& value, GetMetaOnly getMetaOnly)932 GetValue RocksDBKVStore::makeGetValue(uint16_t vb,
933 const DocKey& key,
934 const std::string& value,
935 GetMetaOnly getMetaOnly) {
936 rocksdb::Slice sval(value);
937 return GetValue(
938 makeItem(vb, key, sval, getMetaOnly), ENGINE_SUCCESS, -1, 0);
939 }
940
readVBState(const VBHandle& vbh)941 void RocksDBKVStore::readVBState(const VBHandle& vbh) {
942 // Largely copied from CouchKVStore
943 // TODO RDB: refactor out sections common to CouchKVStore
944 vbucket_state_t state = vbucket_state_dead;
945 uint64_t checkpointId = 0;
946 uint64_t maxDeletedSeqno = 0;
947 int64_t highSeqno = readHighSeqnoFromDisk(vbh);
948 std::string failovers;
949 uint64_t purgeSeqno = 0;
950 uint64_t lastSnapStart = 0;
951 uint64_t lastSnapEnd = 0;
952 uint64_t maxCas = 0;
953 int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised;
954 bool mightContainXattrs = false;
955
956 auto key = getVbstateKey();
957 std::string vbstate;
958 auto vbid = vbh.vbid;
959 auto status = rdb->Get(rocksdb::ReadOptions(),
960 vbh.seqnoCFH.get(),
961 getSeqnoSlice(&key),
962 &vbstate);
963 if (!status.ok()) {
964 if (status.IsNotFound()) {
965 logger.log(EXTENSION_LOG_NOTICE,
966 "RocksDBKVStore::readVBState: '_local/vbstate.%" PRIu16
967 "' not found",
968 vbid);
969 } else {
970 logger.log(EXTENSION_LOG_WARNING,
971 "RocksDBKVStore::readVBState: error getting vbstate "
972 "error:%s, vb:%" PRIu16,
973 status.getState(),
974 vbid);
975 }
976 } else {
977 cJSON* jsonObj = cJSON_Parse(vbstate.c_str());
978 if (!jsonObj) {
979 logger.log(EXTENSION_LOG_WARNING,
980 "RocksKVStore::readVBState: Failed to parse the vbstat "
981 "json doc for vb:%" PRIu16 ", json:%s",
982 vbid,
983 vbstate.c_str());
984 }
985
986 const std::string vb_state =
987 getJSONObjString(cJSON_GetObjectItem(jsonObj, "state"));
988 const std::string checkpoint_id =
989 getJSONObjString(cJSON_GetObjectItem(jsonObj, "checkpoint_id"));
990 const std::string max_deleted_seqno = getJSONObjString(
991 cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
992 const std::string snapStart =
993 getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_start"));
994 const std::string snapEnd =
995 getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_end"));
996 const std::string maxCasValue =
997 getJSONObjString(cJSON_GetObjectItem(jsonObj, "max_cas"));
998 const std::string hlcCasEpoch =
999 getJSONObjString(cJSON_GetObjectItem(jsonObj, "hlc_epoch"));
1000 mightContainXattrs = getJSONObjBool(
1001 cJSON_GetObjectItem(jsonObj, "might_contain_xattrs"));
1002
1003 cJSON* failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
1004 if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0 ||
1005 max_deleted_seqno.compare("") == 0) {
1006 logger.log(EXTENSION_LOG_WARNING,
1007 "RocksDBKVStore::readVBState: State"
1008 " JSON doc for vb:%" PRIu16
1009 " is in the wrong format:%s, "
1010 "vb state:%s, checkpoint id:%s and max deleted seqno:%s",
1011 vbid,
1012 vbstate.c_str(),
1013 vb_state.c_str(),
1014 checkpoint_id.c_str(),
1015 max_deleted_seqno.c_str());
1016 } else {
1017 state = VBucket::fromString(vb_state.c_str());
1018 maxDeletedSeqno = std::stoull(max_deleted_seqno);
1019 checkpointId = std::stoull(checkpoint_id);
1020
1021 if (snapStart.compare("") == 0) {
1022 lastSnapStart = highSeqno;
1023 } else {
1024 lastSnapStart = std::stoull(snapStart.c_str());
1025 }
1026
1027 if (snapEnd.compare("") == 0) {
1028 lastSnapEnd = highSeqno;
1029 } else {
1030 lastSnapEnd = std::stoull(snapEnd.c_str());
1031 }
1032
1033 if (maxCasValue.compare("") != 0) {
1034 maxCas = std::stoull(maxCasValue.c_str());
1035 }
1036
1037 if (!hlcCasEpoch.empty()) {
1038 hlcCasEpochSeqno = std::stoull(hlcCasEpoch);
1039 }
1040
1041 if (failover_json) {
1042 failovers = to_string(failover_json, false);
1043 }
1044 }
1045 cJSON_Delete(jsonObj);
1046 }
1047
1048 cachedVBStates[vbh.vbid] =
1049 std::make_unique<vbucket_state>(state,
1050 checkpointId,
1051 maxDeletedSeqno,
1052 highSeqno,
1053 purgeSeqno,
1054 lastSnapStart,
1055 lastSnapEnd,
1056 maxCas,
1057 hlcCasEpochSeqno,
1058 mightContainXattrs,
1059 failovers);
1060 }
1061
saveVBStateToBatch(const VBHandle& vbh, const vbucket_state& vbState, rocksdb::WriteBatch& batch)1062 rocksdb::Status RocksDBKVStore::saveVBStateToBatch(const VBHandle& vbh,
1063 const vbucket_state& vbState,
1064 rocksdb::WriteBatch& batch) {
1065 std::stringstream jsonState;
1066
1067 jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
1068 << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
1069 << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno
1070 << "\"";
1071 if (!vbState.failovers.empty()) {
1072 jsonState << ",\"failover_table\": " << vbState.failovers;
1073 }
1074 jsonState << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
1075 << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
1076 << ",\"max_cas\": \"" << vbState.maxCas << "\""
1077 << ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\"";
1078
1079 if (vbState.mightContainXattrs) {
1080 jsonState << ",\"might_contain_xattrs\": true";
1081 } else {
1082 jsonState << ",\"might_contain_xattrs\": false";
1083 }
1084
1085 jsonState << "}";
1086
1087 auto key = getVbstateKey();
1088 rocksdb::Slice keySlice = getSeqnoSlice(&key);
1089 return batch.Put(vbh.seqnoCFH.get(), keySlice, jsonState.str());
1090 }
1091
getBaselineDefaultCFOptions()1092 rocksdb::ColumnFamilyOptions RocksDBKVStore::getBaselineDefaultCFOptions() {
1093 rocksdb::ColumnFamilyOptions cfOptions;
1094 // Enable Point Lookup Optimization for the 'default' Column Family
1095 // Note: whatever we give in input as 'block_cache_size_mb', the Block
1096 // Cache will be reset with the shared 'blockCache' of size
1097 // 'rocksdb_block_cache_size'
1098 cfOptions.OptimizeForPointLookup(1);
1099 return cfOptions;
1100 }
1101
getBaselineSeqnoCFOptions()1102 rocksdb::ColumnFamilyOptions RocksDBKVStore::getBaselineSeqnoCFOptions() {
1103 rocksdb::ColumnFamilyOptions cfOptions;
1104 cfOptions.comparator = &seqnoComparator;
1105 return cfOptions;
1106 }
1107
applyUserCFOptions(rocksdb::ColumnFamilyOptions& cfOptions, const std::string& newCfOptions, const std::string& newBbtOptions)1108 void RocksDBKVStore::applyUserCFOptions(rocksdb::ColumnFamilyOptions& cfOptions,
1109 const std::string& newCfOptions,
1110 const std::string& newBbtOptions) {
1111 // Apply 'newCfOptions' on top of 'cfOptions'
1112 auto status = rocksdb::GetColumnFamilyOptionsFromString(
1113 cfOptions, newCfOptions, &cfOptions);
1114 if (!status.ok()) {
1115 throw std::invalid_argument(
1116 std::string("RocksDBKVStore::applyUserCFOptions: "
1117 "GetColumnFamilyOptionsFromString error: ") +
1118 status.getState());
1119 }
1120
1121 // RocksDB ColumnFamilyOptions provide advanced options for the
1122 // Block Based Table file format, which is the default format for SST files.
1123 // Apply 'newBbtOptions' on top of the current BlockBasedTableOptions of
1124 // 'cfOptions'
1125 rocksdb::BlockBasedTableOptions baseOptions;
1126 if (cfOptions.table_factory) {
1127 auto* bbtOptions = cfOptions.table_factory->GetOptions();
1128 if (bbtOptions) {
1129 baseOptions = *(
1130 static_cast<rocksdb::BlockBasedTableOptions*>(bbtOptions));
1131 }
1132 }
1133
1134 rocksdb::BlockBasedTableOptions tableOptions;
1135 status = rocksdb::GetBlockBasedTableOptionsFromString(
1136 baseOptions, newBbtOptions, &tableOptions);
1137 if (!status.ok()) {
1138 throw std::invalid_argument(
1139 std::string("RocksDBKVStore::applyUserCFOptions: "
1140 "GetBlockBasedTableOptionsFromString error: ") +
1141 status.getState());
1142 }
1143
1144 // If using Partitioned Filters, then use the RocksDB recommended params
1145 // (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/filter_policy.h#L133):
1146 // "bits_per_key: bits per key in bloom filter. A good value for
1147 // bits_per_key is 10, which yields a filter with ~1% false
1148 // positive rate.
1149 // use_block_based_builder: use block based filter rather than full
1150 // filter. If you want to build a full filter, it needs to be
1151 // set to false."
1152 if (tableOptions.partition_filters == true) {
1153 tableOptions.filter_policy.reset(
1154 rocksdb::NewBloomFilterPolicy(10, false));
1155 }
1156
1157 // Always use the per-shard shared Block Cache. If it is nullptr, RocksDB
1158 // will allocate a default size Block Cache.
1159 tableOptions.block_cache = blockCache;
1160
1161 // Set the new BlockBasedTableOptions
1162 cfOptions.table_factory.reset(
1163 rocksdb::NewBlockBasedTableFactory(tableOptions));
1164
1165 // Set the user-provided size amplification factor if under Universal
1166 // Compaction
1167 if (cfOptions.compaction_style ==
1168 rocksdb::CompactionStyle::kCompactionStyleUniversal) {
1169 auto& configuration =
1170 dynamic_cast<RocksDBKVStoreConfig&>(this->configuration);
1171 cfOptions.compaction_options_universal.max_size_amplification_percent =
1172 configuration.getUCMaxSizeAmplificationPercent();
1173 }
1174 }
1175
writeAndTimeBatch(rocksdb::WriteBatch batch)1176 rocksdb::Status RocksDBKVStore::writeAndTimeBatch(rocksdb::WriteBatch batch) {
1177 auto begin = ProcessClock::now();
1178 auto status = rdb->Write(writeOptions, &batch);
1179 st.commitHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1180 ProcessClock::now() - begin));
1181 return status;
1182 }
1183
saveDocs( uint16_t vbid, const Item* collectionsManifest, const std::vector<std::unique_ptr<RocksRequest>>& commitBatch)1184 rocksdb::Status RocksDBKVStore::saveDocs(
1185 uint16_t vbid,
1186 const Item* collectionsManifest,
1187 const std::vector<std::unique_ptr<RocksRequest>>& commitBatch) {
1188 auto reqsSize = commitBatch.size();
1189 if (reqsSize == 0) {
1190 st.docsCommitted = 0;
1191 return rocksdb::Status::OK();
1192 }
1193
1194 auto& vbstate = cachedVBStates[vbid];
1195 if (vbstate == nullptr) {
1196 throw std::logic_error("RocksDBKVStore::saveDocs: cachedVBStates[" +
1197 std::to_string(vbid) + "] is NULL");
1198 }
1199
1200 rocksdb::Status status;
1201 int64_t maxDBSeqno = 0;
1202 rocksdb::WriteBatch batch;
1203
1204 const auto vbh = getVBHandle(vbid);
1205
1206 for (const auto& request : commitBatch) {
1207 int64_t bySeqno = request->getDocMeta().bySeqno;
1208 maxDBSeqno = std::max(maxDBSeqno, bySeqno);
1209
1210 status = addRequestToWriteBatch(*vbh, batch, request.get());
1211 if (!status.ok()) {
1212 logger.log(EXTENSION_LOG_WARNING,
1213 "RocksDBKVStore::saveDocs: addRequestToWriteBatch "
1214 "error:%d, vb:%" PRIu16,
1215 status.code(),
1216 vbid);
1217 return status;
1218 }
1219
1220 // Check if we should split into a new writeBatch if the batch size
1221 // exceeds the write_buffer_size - this is necessary because we
1222 // don't want our WriteBatch to exceed the configured memtable size, as
1223 // that can cause significant memory bloating (see MB-26521).
1224 // Note the limit check is only approximate, as the batch contains
1225 // updates for at least 2 CFs (key & seqno) which will be written into
1226 // separate memtables, so we don't exactly know the size contribution
1227 // to each memtable in the batch.
1228 const auto batchLimit = defaultCFOptions.write_buffer_size +
1229 seqnoCFOptions.write_buffer_size;
1230 if (batch.GetDataSize() > batchLimit) {
1231 status = writeAndTimeBatch(batch);
1232 if (!status.ok()) {
1233 logger.log(EXTENSION_LOG_WARNING,
1234 "RocksDBKVStore::saveDocs: rocksdb::DB::Write "
1235 "error:%d, "
1236 "vb:%" PRIu16,
1237 status.code(),
1238 vbid);
1239 return status;
1240 }
1241 batch.Clear();
1242 }
1243 }
1244
1245 status = saveVBStateToBatch(*vbh, *vbstate, batch);
1246 if (!status.ok()) {
1247 logger.log(EXTENSION_LOG_WARNING,
1248 "RocksDBKVStore::saveDocs: saveVBStateToBatch error:%d",
1249 status.code());
1250 return status;
1251 }
1252
1253 status = writeAndTimeBatch(batch);
1254 if (!status.ok()) {
1255 logger.log(EXTENSION_LOG_WARNING,
1256 "RocksDBKVStore::saveDocs: rocksdb::DB::Write error:%d, "
1257 "vb:%" PRIu16,
1258 status.code(),
1259 vbid);
1260 return status;
1261 }
1262
1263 st.batchSize.add(reqsSize);
1264 st.docsCommitted = reqsSize;
1265
1266 // Update high seqno
1267 vbstate->highSeqno = maxDBSeqno;
1268
1269 return rocksdb::Status::OK();
1270 }
1271
addRequestToWriteBatch( const VBHandle& vbh, rocksdb::WriteBatch& batch, RocksRequest* request)1272 rocksdb::Status RocksDBKVStore::addRequestToWriteBatch(
1273 const VBHandle& vbh,
1274 rocksdb::WriteBatch& batch,
1275 RocksRequest* request) {
1276 uint16_t vbid = request->getVBucketId();
1277
1278 rocksdb::Slice keySlice = getKeySlice(request->getKey());
1279 rocksdb::SliceParts keySliceParts(&keySlice, 1);
1280
1281 rocksdb::Slice docSlices[] = {request->getDocMetaSlice(),
1282 request->getDocBodySlice()};
1283 rocksdb::SliceParts valueSliceParts(docSlices, 2);
1284
1285 rocksdb::Slice bySeqnoSlice = getSeqnoSlice(&request->getDocMeta().bySeqno);
1286 // We use the `saveDocsHisto` to track the time spent on
1287 // `rocksdb::WriteBatch::Put()`.
1288 auto begin = ProcessClock::now();
1289 auto status =
1290 batch.Put(vbh.defaultCFH.get(), keySliceParts, valueSliceParts);
1291 if (!status.ok()) {
1292 logger.log(EXTENSION_LOG_WARNING,
1293 "RocksDBKVStore::saveDocs: rocksdb::WriteBatch::Put "
1294 "[ColumnFamily: \'default\'] error:%d, "
1295 "vb:%" PRIu16,
1296 status.code(),
1297 vbid);
1298 return status;
1299 }
1300 status = batch.Put(vbh.seqnoCFH.get(), bySeqnoSlice, keySlice);
1301 if (!status.ok()) {
1302 logger.log(EXTENSION_LOG_WARNING,
1303 "RocksDBKVStore::saveDocs: rocksdb::WriteBatch::Put "
1304 "[ColumnFamily: \'seqno\'] error:%d, "
1305 "vb:%" PRIu16,
1306 status.code(),
1307 vbid);
1308 return status;
1309 }
1310 st.saveDocsHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1311 ProcessClock::now() - begin));
1312
1313 return rocksdb::Status::OK();
1314 }
1315
readHighSeqnoFromDisk(const VBHandle& vbh)1316 int64_t RocksDBKVStore::readHighSeqnoFromDisk(const VBHandle& vbh) {
1317 std::unique_ptr<rocksdb::Iterator> it(
1318 rdb->NewIterator(rocksdb::ReadOptions(), vbh.seqnoCFH.get()));
1319
1320 // Seek to the highest seqno=>key mapping stored for the vbid
1321 auto maxSeqno = std::numeric_limits<int64_t>::max();
1322 rocksdb::Slice maxSeqnoSlice = getSeqnoSlice(&maxSeqno);
1323 it->SeekForPrev(maxSeqnoSlice);
1324
1325 if (!it->Valid()) {
1326 return 0;
1327 }
1328 auto highSeqno = getNumericSeqno(it->key());
1329 // We use a negative seqno as key for VBState. Do not consider it.
1330 return highSeqno >= 0 ? highSeqno : 0;
1331 }
1332
getVbstateKey()1333 int64_t RocksDBKVStore::getVbstateKey() {
1334 // We put the VBState into the SeqnoCF. As items in the SeqnoCF are ordered
1335 // by increasing-seqno, we reserve a negative special key to VBState so
1336 // that we can access it in O(1).
1337 return -9999;
1338 }
1339
initScanContext( std::shared_ptr<StatusCallback<GetValue>> cb, std::shared_ptr<StatusCallback<CacheLookup>> cl, uint16_t vbid, uint64_t startSeqno, DocumentFilter options, ValueFilter valOptions)1340 ScanContext* RocksDBKVStore::initScanContext(
1341 std::shared_ptr<StatusCallback<GetValue>> cb,
1342 std::shared_ptr<StatusCallback<CacheLookup>> cl,
1343 uint16_t vbid,
1344 uint64_t startSeqno,
1345 DocumentFilter options,
1346 ValueFilter valOptions) {
1347 size_t scanId = scanCounter++;
1348
1349 {
1350 std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1351 scanSnapshots.emplace(scanId, SnapshotPtr(rdb->GetSnapshot(), *rdb));
1352 }
1353
1354 // As we cannot efficiently determine how many documents this scan will
1355 // find, we approximate this value with the seqno difference + 1
1356 // as scan is supposed to be inclusive at both ends,
1357 // seqnos 2 to 4 covers 3 docs not 4 - 2 = 2
1358
1359 uint64_t endSeqno = cachedVBStates[vbid]->highSeqno;
1360 return new ScanContext(cb,
1361 cl,
1362 vbid,
1363 scanId,
1364 startSeqno,
1365 endSeqno,
1366 0, /*TODO RDB: pass the real purge-seqno*/
1367 options,
1368 valOptions,
1369 /* documentCount */ endSeqno - startSeqno + 1,
1370 configuration);
1371 }
1372
scan(ScanContext* ctx)1373 scan_error_t RocksDBKVStore::scan(ScanContext* ctx) {
1374 if (!ctx) {
1375 return scan_failed;
1376 }
1377
1378 if (ctx->lastReadSeqno == ctx->maxSeqno) {
1379 return scan_success;
1380 }
1381
1382 auto startSeqno = ctx->startSeqno;
1383 if (ctx->lastReadSeqno != 0) {
1384 startSeqno = ctx->lastReadSeqno + 1;
1385 }
1386
1387 TRACE_EVENT2("RocksDBKVStore",
1388 "scan",
1389 "vbid",
1390 ctx->vbid,
1391 "startSeqno",
1392 startSeqno);
1393
1394 GetMetaOnly isMetaOnly = ctx->valFilter == ValueFilter::KEYS_ONLY
1395 ? GetMetaOnly::Yes
1396 : GetMetaOnly::No;
1397
1398 rocksdb::ReadOptions snapshotOpts{rocksdb::ReadOptions()};
1399
1400 // Lock for safe access to the scanSnapshots map and to ensure the snapshot
1401 // doesn't get destroyed whilst we have the pointer.
1402 // @todo use a shared_ptr and reduce the lock scope to just the map::at call
1403 std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1404 snapshotOpts.snapshot = scanSnapshots.at(ctx->scanId).get();
1405
1406 rocksdb::Slice startSeqnoSlice = getSeqnoSlice(&startSeqno);
1407 const auto vbh = getVBHandle(ctx->vbid);
1408 std::unique_ptr<rocksdb::Iterator> it(
1409 rdb->NewIterator(snapshotOpts, vbh->seqnoCFH.get()));
1410 if (!it) {
1411 throw std::logic_error(
1412 "RocksDBKVStore::scan: rocksdb::Iterator to Seqno Column "
1413 "Family is nullptr");
1414 }
1415 it->Seek(startSeqnoSlice);
1416
1417 rocksdb::Slice endSeqnoSlice = getSeqnoSlice(&ctx->maxSeqno);
1418 auto isPastEnd = [&endSeqnoSlice, this](rocksdb::Slice seqSlice) {
1419 return seqnoComparator.Compare(seqSlice, endSeqnoSlice) == 1;
1420 };
1421
1422 for (; it->Valid() && !isPastEnd(it->key()); it->Next()) {
1423 scanTotalSeqnoHits++;
1424 auto seqno = getNumericSeqno(it->key());
1425 rocksdb::Slice keySlice = it->value();
1426 std::string valueStr;
1427 auto s = rdb->Get(
1428 snapshotOpts, vbh->defaultCFH.get(), keySlice, &valueStr);
1429
1430 if (!s.ok()) {
1431 // TODO RDB: Old seqnos are never removed from the db!
1432 // If the item does not exist (s.isNotFound())
1433 // the seqno => key mapping could be removed; not even
1434 // a tombstone remains of that item.
1435
1436 // Note: I account also the hits for deleted documents because it
1437 // is logically correct. But, we switch on the RocksDB built-in
1438 // Bloom Filter by default and we try to keep all the Filter blocks
1439 // in the BlockCache. So, I expect that the impact of old-seqno hits
1440 // is minimum in this case.
1441 scanOldSeqnoHits++;
1442
1443 continue;
1444 }
1445
1446 rocksdb::Slice valSlice(valueStr);
1447
1448 // TODO RDB: Deal with collections
1449 DocKey key(reinterpret_cast<const uint8_t*>(keySlice.data()),
1450 keySlice.size(),
1451 DocNamespace::DefaultCollection);
1452
1453 std::unique_ptr<Item> itm =
1454 makeItem(ctx->vbid, key, valSlice, isMetaOnly);
1455
1456 if (itm->getBySeqno() > seqno) {
1457 // TODO RDB: Old seqnos are never removed from the db!
1458 // If the item has a newer seqno now, the stale
1459 // seqno => key mapping could be removed
1460 scanOldSeqnoHits++;
1461 continue;
1462 } else if (itm->getBySeqno() < seqno) {
1463 throw std::logic_error(
1464 "RocksDBKVStore::scan: index has a higher seqno"
1465 "than the document in a snapshot!");
1466 }
1467
1468 bool includeDeletes =
1469 (ctx->docFilter == DocumentFilter::NO_DELETES) ? false : true;
1470 bool onlyKeys =
1471 (ctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false;
1472
1473 if (!includeDeletes && itm->isDeleted()) {
1474 continue;
1475 }
1476 int64_t byseqno = itm->getBySeqno();
1477 CacheLookup lookup(key,
1478 byseqno,
1479 ctx->vbid,
1480 ctx->collectionsContext.getSeparator());
1481 ctx->lookup->callback(lookup);
1482
1483 int status = ctx->lookup->getStatus();
1484
1485 if (status == ENGINE_KEY_EEXISTS) {
1486 ctx->lastReadSeqno = byseqno;
1487 continue;
1488 } else if (status == ENGINE_ENOMEM) {
1489 return scan_again;
1490 }
1491
1492 GetValue rv(std::move(itm), ENGINE_SUCCESS, -1, onlyKeys);
1493 ctx->callback->callback(rv);
1494 status = ctx->callback->getStatus();
1495
1496 if (status == ENGINE_ENOMEM) {
1497 return scan_again;
1498 }
1499
1500 ctx->lastReadSeqno = byseqno;
1501 }
1502
1503 cb_assert(it->status().ok()); // Check for any errors found during the scan
1504
1505 return scan_success;
1506 }
1507
destroyScanContext(ScanContext* ctx)1508 void RocksDBKVStore::destroyScanContext(ScanContext* ctx) {
1509 if (ctx == nullptr) {
1510 return;
1511 }
1512 std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1513 // TODO RDB: Might be nice to have the snapshot in the ctx and
1514 // release it on destruction
1515 auto it = scanSnapshots.find(ctx->scanId);
1516 if (it != scanSnapshots.end()) {
1517 scanSnapshots.erase(it);
1518 }
1519 delete ctx;
1520 }
1521
getStatFromMemUsage( const rocksdb::MemoryUtil::UsageType type, size_t& value)1522 bool RocksDBKVStore::getStatFromMemUsage(
1523 const rocksdb::MemoryUtil::UsageType type, size_t& value) {
1524 std::vector<rocksdb::DB*> dbs = {rdb.get()};
1525 auto cache_set = getCachePointers();
1526 std::map<rocksdb::MemoryUtil::UsageType, uint64_t> usageByType;
1527
1528 auto status = rocksdb::MemoryUtil::GetApproximateMemoryUsageByType(
1529 dbs, cache_set, &usageByType);
1530 if (!status.ok()) {
1531 logger.log(EXTENSION_LOG_NOTICE,
1532 "RocksDBKVStore::getStatFromMemUsage: "
1533 "GetApproximateMemoryUsageByType error: %s",
1534 status.getState());
1535 return false;
1536 }
1537
1538 value = usageByType.at(type);
1539
1540 return true;
1541 }
1542
getStatFromStatistics(const rocksdb::Tickers ticker, size_t& value)1543 bool RocksDBKVStore::getStatFromStatistics(const rocksdb::Tickers ticker,
1544 size_t& value) {
1545 const auto statistics = rdb->GetDBOptions().statistics;
1546 if (!statistics) {
1547 return false;
1548 }
1549 value = statistics->getTickerCount(ticker);
1550 return true;
1551 }
1552
getStatFromProperties(ColumnFamily cf, const std::string& property, size_t& value)1553 bool RocksDBKVStore::getStatFromProperties(ColumnFamily cf,
1554 const std::string& property,
1555 size_t& value) {
1556 value = 0;
1557 std::lock_guard<std::mutex> lg(vbhMutex);
1558 for (const auto vbh : vbHandles) {
1559 if (vbh) {
1560 rocksdb::ColumnFamilyHandle* cfh = nullptr;
1561 switch (cf) {
1562 case ColumnFamily::Default:
1563 cfh = vbh->defaultCFH.get();
1564 break;
1565 case ColumnFamily::Seqno:
1566 cfh = vbh->seqnoCFH.get();
1567 break;
1568 }
1569 if (!cfh) {
1570 return false;
1571 }
1572 std::string out;
1573 if (!rdb->GetProperty(cfh, property, &out)) {
1574 return false;
1575 }
1576 value += std::stoull(out);
1577 }
1578 }
1579
1580 return true;
1581 }
1582
1583 // As we implement a VBucket as a pair of two Column Families (a 'default' CF
1584 // and a 'local+seqno' CF), we need to re-set the 'write_buffer_size' for each
1585 // CF when the number of VBuckets managed by the current store changes. The
1586 // goal is to keep the total allocation for all the Memtables under the
1587 // 'rocksdb_memtables_ratio' given in configuration.
1588 // Thus, this function performs the following basic steps:
1589 // 1) Re-calculate the new sizes of all Memtables;
1590 // 2) Apply the new sizes.
1591 // We apply the new sizes using the rocksdb::DB::SetOptions() API. The
1592 // 'write_buffer_size' is a dynamically changeable option. This call changes
1593 // the size of mutable Memtables instantly. If the new size is below the
1594 // current allocation for the Memtable, the next key-value pair added will mark
1595 // the Memtable as immutable and will trigger a flush.
applyMemtablesQuota( const std::lock_guard<std::mutex>& lock)1596 void RocksDBKVStore::applyMemtablesQuota(
1597 const std::lock_guard<std::mutex>& lock) {
1598 const auto vbuckets = getVBucketsCount(lock);
1599
1600 auto& configuration =
1601 dynamic_cast<RocksDBKVStoreConfig&>(this->configuration);
1602
1603 // 1) If configuration.getMemtablesRatio() == 0.0, then
1604 // we just want to use the baseline write_buffer_size.
1605 // 2) If vbuckets == 0, then there is no Memtable (this happens only
1606 // when the underlying RocksDB instance has just been created).
1607 // On both cases the following logic does not apply, so the
1608 // write_buffer_size for both the 'default' and the 'seqno' CFs is left
1609 // to the baseline value.
1610 if (configuration.getMemtablesRatio() > 0.0 && vbuckets > 0) {
1611 const auto memtablesQuota = configuration.getBucketQuota() /
1612 configuration.getMaxShards() *
1613 configuration.getMemtablesRatio();
1614 // TODO: for now I am hard-coding the percentage of Memtables Quota
1615 // that we allocate for the 'deafult' (90%) and 'seqno' (10%) CFs. The
1616 // plan is to expose this percentage as a configuration parameter in a
1617 // follow-up patch.
1618 const auto defaultCFMemtablesQuota = memtablesQuota * 0.9;
1619 const auto seqnoCFMemtablesQuota =
1620 memtablesQuota - defaultCFMemtablesQuota;
1621
1622 // Set the the write_buffer_size for the 'default' CF
1623 defaultCFOptions.write_buffer_size =
1624 defaultCFMemtablesQuota / vbuckets /
1625 defaultCFOptions.max_write_buffer_number;
1626 // Set the write_buffer_size for the 'seqno' CF
1627 seqnoCFOptions.write_buffer_size =
1628 seqnoCFMemtablesQuota / vbuckets /
1629 seqnoCFOptions.max_write_buffer_number;
1630
1631 // Apply the new write_buffer_size
1632 const std::unordered_map<std::string, std::string>
1633 newDefaultCFWriteBufferSize{std::make_pair(
1634 "write_buffer_size",
1635 std::to_string(defaultCFOptions.write_buffer_size))};
1636 const std::unordered_map<std::string, std::string>
1637 newSeqnoCFWriteBufferSize{std::make_pair(
1638 "write_buffer_size",
1639 std::to_string(seqnoCFOptions.write_buffer_size))};
1640 for (const auto& vbh : vbHandles) {
1641 if (vbh) {
1642 auto status = rdb->SetOptions(vbh->defaultCFH.get(),
1643 newDefaultCFWriteBufferSize);
1644 if (!status.ok()) {
1645 throw std::runtime_error(
1646 "RocksDBKVStore::applyMemtablesQuota: SetOptions "
1647 "failed for [vbid: " +
1648 std::to_string(vbh->vbid) + ", CF: default]: " +
1649 status.getState());
1650 }
1651 status = rdb->SetOptions(vbh->seqnoCFH.get(),
1652 newSeqnoCFWriteBufferSize);
1653 if (!status.ok()) {
1654 throw std::runtime_error(
1655 "RocksDBKVStore::applyMemtablesQuota: SetOptions "
1656 "failed for [vbid: " +
1657 std::to_string(vbh->vbid) + ", CF: seqno]: " +
1658 status.getState());
1659 }
1660 }
1661 }
1662 }
1663
1664 // Overwrite Compaction options if Compaction Optimization is enabled
1665 // for the 'default' CF
1666 if (configuration.getDefaultCfOptimizeCompaction() == "level") {
1667 defaultCFOptions.OptimizeLevelStyleCompaction(
1668 defaultCFOptions.write_buffer_size);
1669 } else if (configuration.getDefaultCfOptimizeCompaction() == "universal") {
1670 defaultCFOptions.OptimizeUniversalStyleCompaction(
1671 defaultCFOptions.write_buffer_size);
1672 }
1673 // Overwrite Compaction options if Compaction Optimization is enabled
1674 // for the 'seqno' CF
1675 if (configuration.getSeqnoCfOptimizeCompaction() == "level") {
1676 seqnoCFOptions.OptimizeLevelStyleCompaction(
1677 seqnoCFOptions.write_buffer_size);
1678 } else if (configuration.getSeqnoCfOptimizeCompaction() == "universal") {
1679 seqnoCFOptions.OptimizeUniversalStyleCompaction(
1680 seqnoCFOptions.write_buffer_size);
1681 }
1682 }
1683
getVBucketsCount( const std::lock_guard<std::mutex>&) const1684 size_t RocksDBKVStore::getVBucketsCount(
1685 const std::lock_guard<std::mutex>&) const {
1686 uint16_t count = 0;
1687 for (const auto& vbh : vbHandles) {
1688 if (vbh) {
1689 count++;
1690 }
1691 }
1692 return count;
1693 }
1694