1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc.
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include "rocksdb-kvstore.h"
21 #include "rocksdb-kvstore_config.h"
22 
23 #include "ep_time.h"
24 
25 #include "kvstore_priv.h"
26 
27 #include <phosphor/phosphor.h>
28 #include <platform/sysinfo.h>
29 #include <rocksdb/convenience.h>
30 #include <rocksdb/filter_policy.h>
31 
32 #include <stdio.h>
33 #include <string.h>
34 #include <algorithm>
35 #include <gsl/gsl>
36 #include <limits>
37 #include <thread>
38 
39 #include "vbucket.h"
40 
41 namespace rockskv {
42 // MetaData is used to serialize and de-serialize metadata respectively when
43 // writing a Document mutation request to RocksDB and when reading a Document
44 // from RocksDB.
45 class MetaData {
46 public:
MetaData()47     MetaData()
48         : deleted(0),
49           version(0),
50           datatype(0),
51           flags(0),
52           valueSize(0),
53           exptime(0),
54           cas(0),
55           revSeqno(0),
56           bySeqno(0){};
MetaData(bool deleted, uint8_t version, uint8_t datatype, uint32_t flags, uint32_t valueSize, time_t exptime, uint64_t cas, uint64_t revSeqno, int64_t bySeqno)57     MetaData(bool deleted,
58              uint8_t version,
59              uint8_t datatype,
60              uint32_t flags,
61              uint32_t valueSize,
62              time_t exptime,
63              uint64_t cas,
64              uint64_t revSeqno,
65              int64_t bySeqno)
66         : deleted(deleted),
67           version(version),
68           datatype(datatype),
69           flags(flags),
70           valueSize(valueSize),
71           exptime(exptime),
72           cas(cas),
73           revSeqno(revSeqno),
74           bySeqno(bySeqno){};
75 
76 // The `#pragma pack(1)` directive and the order of members are to keep
77 // the size of MetaData as small as possible and uniform across different
78 // platforms.
79 #pragma pack(1)
80     uint8_t deleted : 1;
81     uint8_t version : 7;
82     uint8_t datatype;
83     uint32_t flags;
84     uint32_t valueSize;
85     time_t exptime;
86     uint64_t cas;
87     uint64_t revSeqno;
88     int64_t bySeqno;
89 #pragma pack()
90 };
91 } // namespace rockskv
92 
93 /**
94  * Class representing a document to be persisted in RocksDB.
95  */
96 class RocksRequest : public IORequest {
97 public:
98     /**
99      * Constructor
100      *
101      * @param item Item instance to be persisted
102      * @param callback Persistence Callback
103      * @param del Flag indicating if it is an item deletion or not
104      */
RocksRequest(const Item& item, MutationRequestCallback& callback)105     RocksRequest(const Item& item, MutationRequestCallback& callback)
106         : IORequest(item.getVBucketId(),
107                     callback,
108                     item.isDeleted(),
109                     item.getKey()),
110           docBody(item.getValue()) {
111         docMeta = rockskv::MetaData(
112                 item.isDeleted(),
113                 0,
114                 item.getDataType(),
115                 item.getFlags(),
116                 item.getNBytes(),
117                 item.isDeleted() ? ep_real_time() : item.getExptime(),
118                 item.getCas(),
119                 item.getRevSeqno(),
120                 item.getBySeqno());
121     }
122 
getDocMeta()123     const rockskv::MetaData& getDocMeta() {
124         return docMeta;
125     }
126 
127     // Get a rocksdb::Slice wrapping the Document MetaData
getDocMetaSlice()128     rocksdb::Slice getDocMetaSlice() {
129         return rocksdb::Slice(reinterpret_cast<char*>(&docMeta),
130                               sizeof(docMeta));
131     }
132 
133     // Get a rocksdb::Slice wrapping the Document Body
getDocBodySlice()134     rocksdb::Slice getDocBodySlice() {
135         const char* data = docBody ? docBody->getData() : nullptr;
136         size_t size = docBody ? docBody->valueSize() : 0;
137         return rocksdb::Slice(data, size);
138     }
139 
140 private:
141     rockskv::MetaData docMeta;
142     value_t docBody;
143 };
144 
145 // RocksDB docs suggest to "Use `rocksdb::DB::DestroyColumnFamilyHandle()` to
146 // close a column family instead of deleting the column family handle directly"
147 struct ColumnFamilyDeleter {
ColumnFamilyDeleterColumnFamilyDeleter148     ColumnFamilyDeleter(rocksdb::DB& db) : db(db) {
149     }
operator ()ColumnFamilyDeleter150     void operator()(rocksdb::ColumnFamilyHandle* cfh) {
151         db.DestroyColumnFamilyHandle(cfh);
152     }
153 
154 private:
155     rocksdb::DB& db;
156 };
157 using ColumnFamilyPtr =
158         std::unique_ptr<rocksdb::ColumnFamilyHandle, ColumnFamilyDeleter>;
159 
160 // The 'VBHandle' class is a wrapper around the ColumnFamilyHandles
161 // for a VBucket.
162 class VBHandle {
163 public:
VBHandle(rocksdb::DB& rdb, rocksdb::ColumnFamilyHandle* defaultCFH, rocksdb::ColumnFamilyHandle* seqnoCFH, uint16_t vbid)164     VBHandle(rocksdb::DB& rdb,
165              rocksdb::ColumnFamilyHandle* defaultCFH,
166              rocksdb::ColumnFamilyHandle* seqnoCFH,
167              uint16_t vbid)
168         : rdb(rdb),
169           defaultCFH(ColumnFamilyPtr(defaultCFH, rdb)),
170           seqnoCFH(ColumnFamilyPtr(seqnoCFH, rdb)),
171           vbid(vbid) {
172     }
173 
dropColumnFamilies()174     void dropColumnFamilies() {
175         // The call to DropColumnFamily() records the drop in the Manifest, but
176         // the actual remove will happen when the ColumFamilyHandle is deleted.
177         auto status = rdb.DropColumnFamily(defaultCFH.get());
178         if (!status.ok()) {
179             throw std::runtime_error(
180                     "VBHandle::dropColumnFamilies: DropColumnFamily failed for "
181                     "[vbid: " +
182                     std::to_string(vbid) + ", CF: default]: " +
183                     status.getState());
184         }
185         status = rdb.DropColumnFamily(seqnoCFH.get());
186         if (!status.ok()) {
187             throw std::runtime_error(
188                     "VBHandle::dropColumnFamilies: DropColumnFamily failed for "
189                     "[vbid: " +
190                     std::to_string(vbid) + ", CF: seqno]: " +
191                     status.getState());
192         }
193     }
194 
195     rocksdb::DB& rdb;
196     const ColumnFamilyPtr defaultCFH;
197     const ColumnFamilyPtr seqnoCFH;
198     const uint16_t vbid;
199 };
200 
RocksDBKVStore(RocksDBKVStoreConfig& configuration)201 RocksDBKVStore::RocksDBKVStore(RocksDBKVStoreConfig& configuration)
202     : KVStore(configuration),
203       vbHandles(configuration.getMaxVBuckets()),
204       in_transaction(false),
205       scanCounter(0),
206       logger(configuration.getLogger()) {
207     cachedVBStates.resize(configuration.getMaxVBuckets());
208     writeOptions.sync = true;
209 
210     // The RocksDB Options is a set of DBOptions and ColumnFamilyOptions.
211     // Together they cover all RocksDB available parameters.
212     auto status = rocksdb::GetDBOptionsFromString(
213             dbOptions, configuration.getDBOptions(), &dbOptions);
214     if (!status.ok()) {
215         throw std::invalid_argument(
216                 std::string("RocksDBKVStore::open: GetDBOptionsFromString "
217                             "error: ") +
218                 status.getState());
219     }
220 
221     // Set number of background threads - note these are per-environment, so
222     // are shared across all DB instances (vBuckets) and all Buckets.
223     auto lowPri = configuration.getLowPriBackgroundThreads();
224     if (lowPri == 0) {
225         lowPri = cb::get_available_cpu_count();
226     }
227     rocksdb::Env::Default()->SetBackgroundThreads(lowPri, rocksdb::Env::LOW);
228 
229     auto highPri = configuration.getHighPriBackgroundThreads();
230     if (highPri == 0) {
231         highPri = cb::get_available_cpu_count();
232     }
233     rocksdb::Env::Default()->SetBackgroundThreads(highPri, rocksdb::Env::HIGH);
234 
235     dbOptions.create_if_missing = true;
236 
237     // We use EventListener to set the correct ThreadLocal engine in the
238     // ObjectRegistry for the RocksDB Flusher and Compactor threads. This
239     // allows the memory tracker to track allocations and deallocations against
240     // the appropriate bucket.
241     auto eventListener =
242             std::make_shared<EventListener>(ObjectRegistry::getCurrentEngine());
243     dbOptions.listeners.emplace_back(eventListener);
244 
245     // Enable Statistics if 'Statistics::stat_level_' is provided by the
246     // configuration. We create a statistics object and pass to the multiple
247     // DBs managed by the same KVStore. Then the statistics object will contain
248     // aggregated values for all those DBs. Note that some stats are undefined
249     // and have no meaningful information across multiple DBs (e.g.,
250     // "rocksdb.sequence.number").
251     if (!configuration.getStatsLevel().empty()) {
252         dbOptions.statistics = rocksdb::CreateDBStatistics();
253         dbOptions.statistics->stats_level_ =
254                 getStatsLevel(configuration.getStatsLevel());
255     }
256 
257     // Apply the environment rate limit for Flush and Compaction
258     dbOptions.rate_limiter = configuration.getEnvRateLimiter();
259 
260     // Allocate the per-shard Block Cache
261     if (configuration.getBlockCacheRatio() > 0.0) {
262         auto blockCacheQuota = configuration.getBucketQuota() *
263                                configuration.getBlockCacheRatio();
264         // Keeping default settings for:
265         // num_shard_bits = -1 (automatically determined)
266         // strict_capacity_limit = false (do not fail insert when cache is full)
267         blockCache = rocksdb::NewLRUCache(
268                 blockCacheQuota / configuration.getMaxShards(),
269                 -1 /*num_shard_bits*/,
270                 false /*strict_capacity_limit*/,
271                 configuration.getBlockCacheHighPriPoolRatio());
272     }
273     // Configure all the Column Families
274     const auto& cfOptions = configuration.getCFOptions();
275     const auto& bbtOptions = configuration.getBBTOptions();
276     defaultCFOptions = getBaselineDefaultCFOptions();
277     seqnoCFOptions = getBaselineSeqnoCFOptions();
278     applyUserCFOptions(defaultCFOptions, cfOptions, bbtOptions);
279     applyUserCFOptions(seqnoCFOptions, cfOptions, bbtOptions);
280 
281     // Open the DB and load the ColumnFamilyHandle for all the
282     // existing Column Families (populates the 'vbHandles' vector)
283     openDB();
284 
285     // Calculate and apply the correct write_buffer_size for all Column
286     // Families. The Memtable size of each CF depends on the count of existing
287     // CFs in DB (besides other things). Thus, this must be called after
288     // 'openDB' (so that all the existing CFs have been loaded).
289     applyMemtablesQuota(std::lock_guard<std::mutex>(vbhMutex));
290 
291     // Read persisted VBs state
292     for (const auto vbh : vbHandles) {
293         if (vbh) {
294             readVBState(*vbh);
295             // Update stats
296             ++st.numLoadedVb;
297         }
298     }
299 }
300 
~RocksDBKVStore()301 RocksDBKVStore::~RocksDBKVStore() {
302     // Guarantees that all the ColumnFamilyHandles for the existing VBuckets
303     // are released before 'rdb' is deleted. From RocksDB docs:
304     //     "Before delete DB, you have to close All column families by calling
305     //      DestroyColumnFamilyHandle() with all the handles."
306     vbHandles.clear();
307     in_transaction = false;
308 }
309 
openDB()310 void RocksDBKVStore::openDB() {
311     auto dbname = getDBSubdir();
312     createDataDir(dbname);
313 
314     std::vector<std::string> cfs;
315     auto status = rocksdb::DB::ListColumnFamilies(dbOptions, dbname, &cfs);
316     if (!status.ok()) {
317         // If ListColumnFamilies failed because the DB does not exist,
318         // then it will be created the first time we call 'rocksdb::DB::Open'.
319         // Else, we throw an error if ListColumnFamilies failed for any other
320         // unexpected reason.
321         if (!(status.code() == rocksdb::Status::kIOError &&
322               std::string(status.getState())
323                               .find("No such file or directory") !=
324                       std::string::npos)) {
325             throw std::runtime_error(
326                     "RocksDBKVStore::openDB: ListColumnFamilies failed for DB "
327                     "'" +
328                     dbname + "': " + status.getState());
329         }
330     }
331 
332     // We need to pass a ColumnFamilyDescriptor for every existing CF.
333     // We populate 'cfDescriptors' so that it results in a vector
334     // containing packed CFs for every VBuckets, e.g. with
335     // MaxShards=4:
336     //     cfDescriptors[0] = default_0
337     //     cfDescriptors[1] = seqno_0
338     //     cfDescriptors[2] = default_4
339     //     cfDescriptors[3] = seqno_4
340     //     ..
341     // That helps us in populating 'vbHandles' later, because after
342     // 'rocksdb::DB::Open' handles[i] will be the handle that we will use
343     // to operate on the ColumnFamily at cfDescriptors[i].
344     std::vector<rocksdb::ColumnFamilyDescriptor> cfDescriptors;
345     for (uint16_t vbid = 0; vbid < configuration.getMaxVBuckets(); vbid++) {
346         if ((vbid % configuration.getMaxShards()) ==
347             configuration.getShardId()) {
348             std::string defaultCF = "default_" + std::to_string(vbid);
349             std::string seqnoCF = "local+seqno_" + std::to_string(vbid);
350             if (std::find(cfs.begin(), cfs.end(), defaultCF) != cfs.end()) {
351                 if (std::find(cfs.begin(), cfs.end(), seqnoCF) == cfs.end()) {
352                     throw std::logic_error("RocksDBKVStore::openDB: DB '" +
353                                            dbname +
354                                            "' is in inconsistent state: CF " +
355                                            seqnoCF + " not found.");
356                 }
357                 cfDescriptors.emplace_back(defaultCF, defaultCFOptions);
358                 cfDescriptors.emplace_back(seqnoCF, seqnoCFOptions);
359             }
360         }
361     }
362 
363     // TODO: The RocksDB built-in 'default' CF always exists, need to check if
364     // we can drop it.
365     cfDescriptors.emplace_back(rocksdb::kDefaultColumnFamilyName,
366                                rocksdb::ColumnFamilyOptions());
367     std::vector<rocksdb::ColumnFamilyHandle*> handles;
368     rocksdb::DB* db;
369     status = rocksdb::DB::Open(dbOptions, dbname, cfDescriptors, &handles, &db);
370     if (!status.ok()) {
371         throw std::runtime_error(
372                 "RocksDBKVStore::openDB: Open failed for database '" + dbname +
373                 "': " + status.getState());
374     }
375     rdb.reset(db);
376 
377     // The way we populated 'cfDescriptors' guarantees that: if 'cfDescriptors'
378     // contains more than only the RocksDB 'default' CF (i.e.,
379     // '(cfDescriptors.size() - 1) > 0') then 'cfDescriptors[i]' and
380     // 'cfDescriptors[i+1]' are respectively the 'default_' and 'seqno'_ CFs
381     // for a certain VBucket.
382     for (uint16_t i = 0; i < (cfDescriptors.size() - 1); i += 2) {
383         // Note: any further sanity-check is redundant as we will have always
384         // 'cf = "default_<vbid>"'.
385         const auto& cf = cfDescriptors[i].name;
386         uint16_t vbid = std::stoi(cf.substr(8));
387         vbHandles[vbid] = std::make_shared<VBHandle>(
388                 *rdb, handles[i], handles[i + 1], vbid);
389     }
390 
391     // We need to release the ColumnFamilyHandle for the built-in 'default' CF
392     // here, as it is not managed by any VBHandle.
393     rdb->DestroyColumnFamilyHandle(handles.back());
394 }
395 
getVBHandle(uint16_t vbid)396 std::shared_ptr<VBHandle> RocksDBKVStore::getVBHandle(uint16_t vbid) {
397     std::lock_guard<std::mutex> lg(vbhMutex);
398     if (vbHandles[vbid]) {
399         return vbHandles[vbid];
400     }
401 
402     // If the VBHandle for vbid does not exist it means that we need to create
403     // the VBucket, i.e. we need to create the set of CFs on DB for vbid
404     std::vector<rocksdb::ColumnFamilyDescriptor> cfDescriptors;
405     auto vbid_ = std::to_string(vbid);
406     cfDescriptors.emplace_back("default_" + vbid_, defaultCFOptions);
407     cfDescriptors.emplace_back("local+seqno_" + vbid_, seqnoCFOptions);
408 
409     std::vector<rocksdb::ColumnFamilyHandle*> handles;
410     auto status = rdb->CreateColumnFamilies(cfDescriptors, &handles);
411     if (!status.ok()) {
412         for (auto* cfh : handles) {
413             status = rdb->DropColumnFamily(cfh);
414             if (!status.ok()) {
415                 throw std::runtime_error(
416                         "RocksDBKVStore::getVBHandle: DropColumnFamily failed "
417                         "for CF " +
418                         cfh->GetName() + ": " + status.getState());
419             }
420         }
421         throw std::runtime_error(
422                 "RocksDBKVStore::getVBHandle: CreateColumnFamilies failed for "
423                 "vbid " +
424                 std::to_string(vbid) + ": " + status.getState());
425     }
426 
427     vbHandles[vbid] =
428             std::make_shared<VBHandle>(*rdb, handles[0], handles[1], vbid);
429 
430     // The number of VBuckets has increased, we need to re-balance the
431     // Memtables Quota among the CFs of existing VBuckets.
432     applyMemtablesQuota(lg);
433 
434     return vbHandles[vbid];
435 }
436 
getDBSubdir()437 std::string RocksDBKVStore::getDBSubdir() {
438     return configuration.getDBName() + "/rocksdb." +
439            std::to_string(configuration.getShardId());
440 }
441 
begin(std::unique_ptr<TransactionContext> txCtx)442 bool RocksDBKVStore::begin(std::unique_ptr<TransactionContext> txCtx) {
443     if (!txCtx) {
444         throw std::invalid_argument("RocksDBKVStore::begin: txCtx is null");
445     }
446     in_transaction = true;
447     transactionCtx = std::move(txCtx);
448     return in_transaction;
449 }
450 
commit(const Item* collectionsManifest)451 bool RocksDBKVStore::commit(const Item* collectionsManifest) {
452     // This behaviour is to replicate the one in Couchstore.
453     // If `commit` is called when not in transaction, just return true.
454     if (!in_transaction) {
455         return true;
456     }
457 
458     if (pendingReqs.size() == 0) {
459         in_transaction = false;
460         return true;
461     }
462 
463     // Swap `pendingReqs` with the temporary `commitBatch` so that we can
464     // shorten the scope of the lock.
465     std::vector<std::unique_ptr<RocksRequest>> commitBatch;
466     {
467         std::lock_guard<std::mutex> lock(writeMutex);
468         std::swap(pendingReqs, commitBatch);
469     }
470 
471     bool success = true;
472     auto vbid = commitBatch[0]->getVBucketId();
473 
474     // Flush all documents to disk
475     auto status = saveDocs(vbid, collectionsManifest, commitBatch);
476     if (!status.ok()) {
477         logger.log(EXTENSION_LOG_WARNING,
478                    "RocksDBKVStore::commit: saveDocs error:%d, "
479                    "vb:%" PRIu16,
480                    status.code(),
481                    vbid);
482         success = false;
483     }
484 
485     commitCallback(status, commitBatch);
486 
487     // This behaviour is to replicate the one in Couchstore.
488     // Set `in_transanction = false` only if `commit` is successful.
489     if (success) {
490         in_transaction = false;
491         transactionCtx.reset();
492     }
493 
494     return success;
495 }
496 
getMutationStatus(rocksdb::Status status)497 static int getMutationStatus(rocksdb::Status status) {
498     switch (status.code()) {
499     case rocksdb::Status::Code::kOk:
500         return MUTATION_SUCCESS;
501     case rocksdb::Status::Code::kNotFound:
502         // This return value causes ep-engine to drop the failed flush
503         return DOC_NOT_FOUND;
504     case rocksdb::Status::Code::kBusy:
505         // This return value causes ep-engine to keep re-queueing the failed
506         // flush
507         return MUTATION_FAILED;
508     default:
509         throw std::runtime_error(
510                 std::string("getMutationStatus: RocksDB error:") +
511                 std::string(status.getState()));
512     }
513 }
514 
commitCallback( rocksdb::Status status, const std::vector<std::unique_ptr<RocksRequest>>& commitBatch)515 void RocksDBKVStore::commitCallback(
516         rocksdb::Status status,
517         const std::vector<std::unique_ptr<RocksRequest>>& commitBatch) {
518     for (const auto& request : commitBatch) {
519         auto dataSize = request->getDocMetaSlice().size() +
520                         request->getDocBodySlice().size();
521         const auto& key = request->getKey();
522         /* update ep stats */
523         ++st.io_num_write;
524         st.io_write_bytes += (key.size() + dataSize);
525 
526         auto rv = getMutationStatus(status);
527         if (request->isDelete()) {
528             if (status.code()) {
529                 ++st.numDelFailure;
530             } else {
531                 st.delTimeHisto.add(request->getDelta() / 1000);
532             }
533             if (rv != -1) {
534                 // TODO: Should set `rv` to 1 or 0 depending on if this is a
535                 // delete to an existing (1) or non-existing (0) item. However,
536                 // to achieve this we would need to perform a Get to RocksDB
537                 // which is costly. For now just assume that the item did exist.
538                 rv = 1;
539             }
540             request->getDelCallback()->callback(*transactionCtx, rv);
541         } else {
542             if (status.code()) {
543                 ++st.numSetFailure;
544             } else {
545                 st.writeTimeHisto.add(request->getDelta() / 1000);
546                 st.writeSizeHisto.add(dataSize + key.size());
547             }
548             // TODO: Should set `mr.second` to true or false depending on if
549             // this is an insertion (true) or an update of an existing item
550             // (false). However, to achieve this we would need to perform a Get
551             // to RocksDB which is costly. For now just assume that the item
552             // did not exist.
553             mutation_result mr = std::make_pair(1, true);
554             request->getSetCallback()->callback(*transactionCtx, mr);
555         }
556     }
557 }
558 
rollback()559 void RocksDBKVStore::rollback() {
560     if (in_transaction) {
561         in_transaction = false;
562         transactionCtx.reset();
563     }
564 }
565 
listPersistedVbuckets()566 std::vector<vbucket_state*> RocksDBKVStore::listPersistedVbuckets() {
567     std::vector<vbucket_state*> result;
568     for (const auto& vb : cachedVBStates) {
569         result.emplace_back(vb.get());
570     }
571     return result;
572 }
573 
set(const Item& item, Callback<TransactionContext, mutation_result>& cb)574 void RocksDBKVStore::set(const Item& item,
575                          Callback<TransactionContext, mutation_result>& cb) {
576     if (!in_transaction) {
577         throw std::logic_error(
578                 "RocksDBKVStore::set: in_transaction must be true to perform a "
579                 "set operation.");
580     }
581     MutationRequestCallback callback;
582     callback.setCb = &cb;
583     pendingReqs.push_back(std::make_unique<RocksRequest>(item, callback));
584 }
585 
get(const StoredDocKey& key, uint16_t vb, bool fetchDelete)586 GetValue RocksDBKVStore::get(const StoredDocKey& key,
587                              uint16_t vb,
588                              bool fetchDelete) {
589     return getWithHeader(nullptr, key, vb, GetMetaOnly::No, fetchDelete);
590 }
591 
getWithHeader(void* dbHandle, const StoredDocKey& key, uint16_t vb, GetMetaOnly getMetaOnly, bool fetchDelete)592 GetValue RocksDBKVStore::getWithHeader(void* dbHandle,
593                                        const StoredDocKey& key,
594                                        uint16_t vb,
595                                        GetMetaOnly getMetaOnly,
596                                        bool fetchDelete) {
597     std::string value;
598     const auto vbh = getVBHandle(vb);
599     // TODO RDB: use a PinnableSlice to avoid some memcpy
600     rocksdb::Slice keySlice = getKeySlice(key);
601     rocksdb::Status s = rdb->Get(
602             rocksdb::ReadOptions(), vbh->defaultCFH.get(), keySlice, &value);
603     if (!s.ok()) {
604         return GetValue{NULL, ENGINE_KEY_ENOENT};
605     }
606     return makeGetValue(vb, key, value, getMetaOnly);
607 }
608 
getMulti(uint16_t vb, vb_bgfetch_queue_t& itms)609 void RocksDBKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t& itms) {
610     // TODO RDB: RocksDB supports a multi get which we should use here.
611     for (auto& it : itms) {
612         auto& key = it.first;
613         rocksdb::Slice keySlice = getKeySlice(key);
614         std::string value;
615         const auto vbh = getVBHandle(vb);
616         rocksdb::Status s = rdb->Get(rocksdb::ReadOptions(),
617                                      vbh->defaultCFH.get(),
618                                      keySlice,
619                                      &value);
620         if (s.ok()) {
621             it.second.value =
622                     makeGetValue(vb, key, value, it.second.isMetaOnly);
623             GetValue* rv = &it.second.value;
624             for (auto& fetch : it.second.bgfetched_list) {
625                 fetch->value = rv;
626             }
627         } else {
628             for (auto& fetch : it.second.bgfetched_list) {
629                 fetch->value->setStatus(ENGINE_KEY_ENOENT);
630             }
631         }
632     }
633 }
634 
reset(uint16_t vbucketId)635 void RocksDBKVStore::reset(uint16_t vbucketId) {
636     // TODO RDB:  Implement.
637 }
638 
del(const Item& item, Callback<TransactionContext, int>& cb)639 void RocksDBKVStore::del(const Item& item,
640                          Callback<TransactionContext, int>& cb) {
641     if (!item.isDeleted()) {
642         throw std::invalid_argument(
643                 "RocksDBKVStore::del item to delete is not marked as deleted.");
644     }
645     if (!in_transaction) {
646         throw std::logic_error(
647                 "RocksDBKVStore::del: in_transaction must be true to perform a "
648                 "delete operation.");
649     }
650     // TODO: Deleted items remain as tombstones, but are not yet expired,
651     // they will accumuate forever.
652     MutationRequestCallback callback;
653     callback.delCb = &cb;
654     pendingReqs.push_back(std::make_unique<RocksRequest>(item, callback));
655 }
656 
delVBucket(uint16_t vbid, uint64_t vb_version)657 void RocksDBKVStore::delVBucket(uint16_t vbid, uint64_t vb_version) {
658     std::lock_guard<std::mutex> lg1(writeMutex);
659     std::lock_guard<std::mutex> lg2(vbhMutex);
660 
661     if (!vbHandles[vbid]) {
662         logger.log(EXTENSION_LOG_WARNING,
663                    "RocksDBKVStore::delVBucket: VBucket not found, vb:%" PRIu16,
664                    vbid);
665         return;
666     }
667 
668     // 'vbHandles' stores a shared_ptr to VBHandle for each VBucket . The
669     // ownership of each pointer is shared among multiple threads performing
670     // different operations (e.g., 'get' and 'commit').
671     // We want to call 'DropColumnFamily' here rather than in other threads
672     // because it is an expensive, IO-intensive operation and we do not want
673     // it to cause another thread (possibly a front-end one) from being blocked
674     // performing the drop.
675     // So, the thread executing 'delVBucket' spins until it is the exclusive
676     // owner of the shared_ptr (i.e., other concurrent threads like 'commit'
677     // have completed and do not own any copy of the shared_ptr).
678     {
679         std::shared_ptr<VBHandle> sharedPtr;
680         std::swap(vbHandles[vbid], sharedPtr);
681         while (!sharedPtr.unique()) {
682             std::this_thread::sleep_for(std::chrono::microseconds(100));
683         }
684         // Drop all the CF for vbid.
685         sharedPtr->dropColumnFamilies();
686     }
687 
688     // The number of VBuckets has decreased, we need to re-balance the
689     // Memtables Quota among the CFs of existing VBuckets.
690     applyMemtablesQuota(lg2);
691 }
692 
snapshotVBucket(uint16_t vbucketId, const vbucket_state& vbstate, VBStatePersist options)693 bool RocksDBKVStore::snapshotVBucket(uint16_t vbucketId,
694                                      const vbucket_state& vbstate,
695                                      VBStatePersist options) {
696     // TODO RDB: Refactor out behaviour common to this and CouchKVStore
697     auto start = ProcessClock::now();
698 
699     if (updateCachedVBState(vbucketId, vbstate) &&
700         (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT ||
701          options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT)) {
702         const auto vbh = getVBHandle(vbucketId);
703         rocksdb::WriteBatch batch;
704         auto status = saveVBStateToBatch(*vbh, vbstate, batch);
705         if (!status.ok()) {
706             logger.log(EXTENSION_LOG_WARNING,
707                        "RocksDBKVStore::snapshotVBucket: saveVBStateToBatch() "
708                        "failed state:%s vb:%" PRIu16 " :%s",
709                        VBucket::toString(vbstate.state),
710                        vbucketId,
711                        status.getState());
712             return false;
713         }
714         status = rdb->Write(writeOptions, &batch);
715         if (!status.ok()) {
716             logger.log(EXTENSION_LOG_WARNING,
717                        "RocksDBKVStore::snapshotVBucket: Write() "
718                        "failed state:%s vb:%" PRIu16 " :%s",
719                        VBucket::toString(vbstate.state),
720                        vbucketId,
721                        status.getState());
722             return false;
723         }
724     }
725 
726     LOG(EXTENSION_LOG_DEBUG,
727         "RocksDBKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16
728         " state:%s",
729         vbucketId,
730         vbstate.toJSON().c_str());
731 
732     st.snapshotHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
733             ProcessClock::now() - start));
734 
735     return true;
736 }
737 
snapshotStats(const std::map<std::string, std::string>&)738 bool RocksDBKVStore::snapshotStats(const std::map<std::string, std::string>&) {
739     // TODO RDB:  Implement
740     return true;
741 }
742 
destroyInvalidVBuckets(bool)743 void RocksDBKVStore::destroyInvalidVBuckets(bool) {
744     // TODO RDB:  implement
745 }
746 
getNumShards()747 size_t RocksDBKVStore::getNumShards() {
748     return configuration.getMaxShards();
749 }
750 
getStat(const char* name_, size_t& value)751 bool RocksDBKVStore::getStat(const char* name_, size_t& value) {
752     std::string name(name_);
753 
754     // Memory Usage
755     if (name == "kMemTableTotal") {
756         return getStatFromMemUsage(rocksdb::MemoryUtil::kMemTableTotal, value);
757     } else if (name == "kMemTableUnFlushed") {
758         return getStatFromMemUsage(rocksdb::MemoryUtil::kMemTableUnFlushed,
759                                    value);
760     } else if (name == "kTableReadersTotal") {
761         return getStatFromMemUsage(rocksdb::MemoryUtil::kTableReadersTotal,
762                                    value);
763     } else if (name == "kCacheTotal") {
764         return getStatFromMemUsage(rocksdb::MemoryUtil::kCacheTotal, value);
765     }
766 
767     // MemTable Size per Column Famiy
768     else if (name == "default_kSizeAllMemTables") {
769         return getStatFromProperties(ColumnFamily::Default,
770                                      rocksdb::DB::Properties::kSizeAllMemTables,
771                                      value);
772     } else if (name == "seqno_kSizeAllMemTables") {
773         return getStatFromProperties(ColumnFamily::Seqno,
774                                      rocksdb::DB::Properties::kSizeAllMemTables,
775                                      value);
776     }
777 
778     // Block Cache hit/miss
779     else if (name == "rocksdb.block.cache.hit") {
780         return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_HIT, value);
781     } else if (name == "rocksdb.block.cache.miss") {
782         return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_MISS, value);
783     } else if (name == "rocksdb.block.cache.data.hit") {
784         return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_DATA_HIT,
785                                      value);
786     } else if (name == "rocksdb.block.cache.data.miss") {
787         return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_DATA_MISS,
788                                      value);
789     } else if (name == "rocksdb.block.cache.index.hit") {
790         return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_INDEX_HIT,
791                                      value);
792     } else if (name == "rocksdb.block.cache.index.miss") {
793         return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_INDEX_MISS,
794                                      value);
795     } else if (name == "rocksdb.block.cache.filter.hit") {
796         return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_FILTER_HIT,
797                                      value);
798     } else if (name == "rocksdb.block.cache.filter.miss") {
799         return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_FILTER_MISS,
800                                      value);
801     }
802 
803     // Disk Usage per Column Family
804     else if (name == "default_kTotalSstFilesSize") {
805         return getStatFromProperties(
806                 ColumnFamily::Default,
807                 rocksdb::DB::Properties::kTotalSstFilesSize,
808                 value);
809     } else if (name == "seqno_kTotalSstFilesSize") {
810         return getStatFromProperties(
811                 ColumnFamily::Seqno,
812                 rocksdb::DB::Properties::kTotalSstFilesSize,
813                 value);
814     }
815 
816     // Scan stats
817     else if (name == "scan_totalSeqnoHits") {
818         value = scanTotalSeqnoHits.load();
819         return true;
820     } else if (name == "scan_oldSeqnoHits") {
821         value = scanOldSeqnoHits.load();
822         return true;
823     }
824 
825     return false;
826 }
827 
getStorageProperties(void)828 StorageProperties RocksDBKVStore::getStorageProperties(void) {
829     StorageProperties rv(StorageProperties::EfficientVBDump::Yes,
830                          StorageProperties::EfficientVBDeletion::Yes,
831                          StorageProperties::PersistedDeletion::No,
832                          // TODO RDB: Not strictly true, multiGet
833                          // does not yet use the underlying multi get
834                          // of RocksDB
835                          StorageProperties::EfficientGet::Yes,
836                          StorageProperties::ConcurrentWriteCompact::Yes);
837     return rv;
838 }
839 
getCachePointers()840 std::unordered_set<const rocksdb::Cache*> RocksDBKVStore::getCachePointers() {
841     std::unordered_set<const rocksdb::Cache*> cache_set;
842 
843     // TODO: Cache from DBImpl. The 'std::shared_ptr<Cache>
844     // table_cache_' pointer is not exposed through the 'DB' interface
845 
846     // Cache from DBOptions
847     // Note: we do not use the 'row_cache' currently.
848     cache_set.insert(rdb->GetDBOptions().row_cache.get());
849 
850     // Cache from table factories.
851     addCFBlockCachePointers(defaultCFOptions, cache_set);
852     addCFBlockCachePointers(seqnoCFOptions, cache_set);
853 
854     return cache_set;
855 }
856 
addCFBlockCachePointers( const rocksdb::ColumnFamilyOptions& cfOptions, std::unordered_set<const rocksdb::Cache*>& cache_set)857 void RocksDBKVStore::addCFBlockCachePointers(
858         const rocksdb::ColumnFamilyOptions& cfOptions,
859         std::unordered_set<const rocksdb::Cache*>& cache_set) {
860     if (cfOptions.table_factory) {
861         auto* table_options = cfOptions.table_factory->GetOptions();
862         auto* bbt_options =
863                 static_cast<rocksdb::BlockBasedTableOptions*>(table_options);
864         cache_set.insert(bbt_options->block_cache.get());
865         cache_set.insert(bbt_options->block_cache_compressed.get());
866     }
867 }
868 
getStatsLevel( const std::string& stats_level)869 rocksdb::StatsLevel RocksDBKVStore::getStatsLevel(
870         const std::string& stats_level) {
871     if (stats_level == "kExceptDetailedTimers") {
872         return rocksdb::StatsLevel::kExceptDetailedTimers;
873     } else if (stats_level == "kExceptTimeForMutex") {
874         return rocksdb::StatsLevel::kExceptTimeForMutex;
875     } else if (stats_level == "kAll") {
876         return rocksdb::StatsLevel::kAll;
877     } else {
878         throw std::invalid_argument(
879                 std::string("RocksDBKVStore::getStatsLevel: stats_level: '") +
880                 stats_level + std::string("'"));
881     }
882 }
883 
getKeySlice(const DocKey& key)884 rocksdb::Slice RocksDBKVStore::getKeySlice(const DocKey& key) {
885     return rocksdb::Slice(reinterpret_cast<const char*>(key.data()),
886                           key.size());
887 }
888 
getSeqnoSlice(const int64_t* seqno)889 rocksdb::Slice RocksDBKVStore::getSeqnoSlice(const int64_t* seqno) {
890     return rocksdb::Slice(reinterpret_cast<const char*>(seqno), sizeof(*seqno));
891 }
892 
getNumericSeqno(const rocksdb::Slice& seqnoSlice)893 int64_t RocksDBKVStore::getNumericSeqno(const rocksdb::Slice& seqnoSlice) {
894     assert(seqnoSlice.size() == sizeof(int64_t));
895     int64_t seqno;
896     std::memcpy(&seqno, seqnoSlice.data(), seqnoSlice.size());
897     return seqno;
898 }
899 
makeItem(uint16_t vb, const DocKey& key, const rocksdb::Slice& s, GetMetaOnly getMetaOnly)900 std::unique_ptr<Item> RocksDBKVStore::makeItem(uint16_t vb,
901                                                const DocKey& key,
902                                                const rocksdb::Slice& s,
903                                                GetMetaOnly getMetaOnly) {
904     assert(s.size() >= sizeof(rockskv::MetaData));
905 
906     const char* data = s.data();
907 
908     rockskv::MetaData meta;
909     std::memcpy(&meta, data, sizeof(meta));
910     data += sizeof(meta);
911 
912     bool includeValue = getMetaOnly == GetMetaOnly::No && meta.valueSize;
913 
914     auto item = std::make_unique<Item>(key,
915                                        meta.flags,
916                                        meta.exptime,
917                                        includeValue ? data : nullptr,
918                                        includeValue ? meta.valueSize : 0,
919                                        meta.datatype,
920                                        meta.cas,
921                                        meta.bySeqno,
922                                        vb,
923                                        meta.revSeqno);
924 
925     if (meta.deleted) {
926         item->setDeleted();
927     }
928 
929     return item;
930 }
931 
makeGetValue(uint16_t vb, const DocKey& key, const std::string& value, GetMetaOnly getMetaOnly)932 GetValue RocksDBKVStore::makeGetValue(uint16_t vb,
933                                       const DocKey& key,
934                                       const std::string& value,
935                                       GetMetaOnly getMetaOnly) {
936     rocksdb::Slice sval(value);
937     return GetValue(
938             makeItem(vb, key, sval, getMetaOnly), ENGINE_SUCCESS, -1, 0);
939 }
940 
readVBState(const VBHandle& vbh)941 void RocksDBKVStore::readVBState(const VBHandle& vbh) {
942     // Largely copied from CouchKVStore
943     // TODO RDB: refactor out sections common to CouchKVStore
944     vbucket_state_t state = vbucket_state_dead;
945     uint64_t checkpointId = 0;
946     uint64_t maxDeletedSeqno = 0;
947     int64_t highSeqno = readHighSeqnoFromDisk(vbh);
948     std::string failovers;
949     uint64_t purgeSeqno = 0;
950     uint64_t lastSnapStart = 0;
951     uint64_t lastSnapEnd = 0;
952     uint64_t maxCas = 0;
953     int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised;
954     bool mightContainXattrs = false;
955 
956     auto key = getVbstateKey();
957     std::string vbstate;
958     auto vbid = vbh.vbid;
959     auto status = rdb->Get(rocksdb::ReadOptions(),
960                            vbh.seqnoCFH.get(),
961                            getSeqnoSlice(&key),
962                            &vbstate);
963     if (!status.ok()) {
964         if (status.IsNotFound()) {
965             logger.log(EXTENSION_LOG_NOTICE,
966                        "RocksDBKVStore::readVBState: '_local/vbstate.%" PRIu16
967                        "' not found",
968                        vbid);
969         } else {
970             logger.log(EXTENSION_LOG_WARNING,
971                        "RocksDBKVStore::readVBState: error getting vbstate "
972                        "error:%s, vb:%" PRIu16,
973                        status.getState(),
974                        vbid);
975         }
976     } else {
977         cJSON* jsonObj = cJSON_Parse(vbstate.c_str());
978         if (!jsonObj) {
979             logger.log(EXTENSION_LOG_WARNING,
980                        "RocksKVStore::readVBState: Failed to parse the vbstat "
981                        "json doc for vb:%" PRIu16 ", json:%s",
982                        vbid,
983                        vbstate.c_str());
984         }
985 
986         const std::string vb_state =
987                 getJSONObjString(cJSON_GetObjectItem(jsonObj, "state"));
988         const std::string checkpoint_id =
989                 getJSONObjString(cJSON_GetObjectItem(jsonObj, "checkpoint_id"));
990         const std::string max_deleted_seqno = getJSONObjString(
991                 cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
992         const std::string snapStart =
993                 getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_start"));
994         const std::string snapEnd =
995                 getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_end"));
996         const std::string maxCasValue =
997                 getJSONObjString(cJSON_GetObjectItem(jsonObj, "max_cas"));
998         const std::string hlcCasEpoch =
999                 getJSONObjString(cJSON_GetObjectItem(jsonObj, "hlc_epoch"));
1000         mightContainXattrs = getJSONObjBool(
1001                 cJSON_GetObjectItem(jsonObj, "might_contain_xattrs"));
1002 
1003         cJSON* failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
1004         if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0 ||
1005             max_deleted_seqno.compare("") == 0) {
1006             logger.log(EXTENSION_LOG_WARNING,
1007                        "RocksDBKVStore::readVBState: State"
1008                        " JSON doc for vb:%" PRIu16
1009                        " is in the wrong format:%s, "
1010                        "vb state:%s, checkpoint id:%s and max deleted seqno:%s",
1011                        vbid,
1012                        vbstate.c_str(),
1013                        vb_state.c_str(),
1014                        checkpoint_id.c_str(),
1015                        max_deleted_seqno.c_str());
1016         } else {
1017             state = VBucket::fromString(vb_state.c_str());
1018             maxDeletedSeqno = std::stoull(max_deleted_seqno);
1019             checkpointId = std::stoull(checkpoint_id);
1020 
1021             if (snapStart.compare("") == 0) {
1022                 lastSnapStart = highSeqno;
1023             } else {
1024                 lastSnapStart = std::stoull(snapStart.c_str());
1025             }
1026 
1027             if (snapEnd.compare("") == 0) {
1028                 lastSnapEnd = highSeqno;
1029             } else {
1030                 lastSnapEnd = std::stoull(snapEnd.c_str());
1031             }
1032 
1033             if (maxCasValue.compare("") != 0) {
1034                 maxCas = std::stoull(maxCasValue.c_str());
1035             }
1036 
1037             if (!hlcCasEpoch.empty()) {
1038                 hlcCasEpochSeqno = std::stoull(hlcCasEpoch);
1039             }
1040 
1041             if (failover_json) {
1042                 failovers = to_string(failover_json, false);
1043             }
1044         }
1045         cJSON_Delete(jsonObj);
1046     }
1047 
1048     cachedVBStates[vbh.vbid] =
1049             std::make_unique<vbucket_state>(state,
1050                                             checkpointId,
1051                                             maxDeletedSeqno,
1052                                             highSeqno,
1053                                             purgeSeqno,
1054                                             lastSnapStart,
1055                                             lastSnapEnd,
1056                                             maxCas,
1057                                             hlcCasEpochSeqno,
1058                                             mightContainXattrs,
1059                                             failovers);
1060 }
1061 
saveVBStateToBatch(const VBHandle& vbh, const vbucket_state& vbState, rocksdb::WriteBatch& batch)1062 rocksdb::Status RocksDBKVStore::saveVBStateToBatch(const VBHandle& vbh,
1063                                                    const vbucket_state& vbState,
1064                                                    rocksdb::WriteBatch& batch) {
1065     std::stringstream jsonState;
1066 
1067     jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
1068               << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
1069               << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno
1070               << "\"";
1071     if (!vbState.failovers.empty()) {
1072         jsonState << ",\"failover_table\": " << vbState.failovers;
1073     }
1074     jsonState << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
1075               << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
1076               << ",\"max_cas\": \"" << vbState.maxCas << "\""
1077               << ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\"";
1078 
1079     if (vbState.mightContainXattrs) {
1080         jsonState << ",\"might_contain_xattrs\": true";
1081     } else {
1082         jsonState << ",\"might_contain_xattrs\": false";
1083     }
1084 
1085     jsonState << "}";
1086 
1087     auto key = getVbstateKey();
1088     rocksdb::Slice keySlice = getSeqnoSlice(&key);
1089     return batch.Put(vbh.seqnoCFH.get(), keySlice, jsonState.str());
1090 }
1091 
getBaselineDefaultCFOptions()1092 rocksdb::ColumnFamilyOptions RocksDBKVStore::getBaselineDefaultCFOptions() {
1093     rocksdb::ColumnFamilyOptions cfOptions;
1094     // Enable Point Lookup Optimization for the 'default' Column Family
1095     // Note: whatever we give in input as 'block_cache_size_mb', the Block
1096     // Cache will be reset with the shared 'blockCache' of size
1097     // 'rocksdb_block_cache_size'
1098     cfOptions.OptimizeForPointLookup(1);
1099     return cfOptions;
1100 }
1101 
getBaselineSeqnoCFOptions()1102 rocksdb::ColumnFamilyOptions RocksDBKVStore::getBaselineSeqnoCFOptions() {
1103     rocksdb::ColumnFamilyOptions cfOptions;
1104     cfOptions.comparator = &seqnoComparator;
1105     return cfOptions;
1106 }
1107 
applyUserCFOptions(rocksdb::ColumnFamilyOptions& cfOptions, const std::string& newCfOptions, const std::string& newBbtOptions)1108 void RocksDBKVStore::applyUserCFOptions(rocksdb::ColumnFamilyOptions& cfOptions,
1109                                         const std::string& newCfOptions,
1110                                         const std::string& newBbtOptions) {
1111     // Apply 'newCfOptions' on top of 'cfOptions'
1112     auto status = rocksdb::GetColumnFamilyOptionsFromString(
1113             cfOptions, newCfOptions, &cfOptions);
1114     if (!status.ok()) {
1115         throw std::invalid_argument(
1116                 std::string("RocksDBKVStore::applyUserCFOptions:  "
1117                             "GetColumnFamilyOptionsFromString error: ") +
1118                 status.getState());
1119     }
1120 
1121     // RocksDB ColumnFamilyOptions provide advanced options for the
1122     // Block Based Table file format, which is the default format for SST files.
1123     // Apply 'newBbtOptions' on top of the current BlockBasedTableOptions of
1124     // 'cfOptions'
1125     rocksdb::BlockBasedTableOptions baseOptions;
1126     if (cfOptions.table_factory) {
1127         auto* bbtOptions = cfOptions.table_factory->GetOptions();
1128         if (bbtOptions) {
1129             baseOptions = *(
1130                     static_cast<rocksdb::BlockBasedTableOptions*>(bbtOptions));
1131         }
1132     }
1133 
1134     rocksdb::BlockBasedTableOptions tableOptions;
1135     status = rocksdb::GetBlockBasedTableOptionsFromString(
1136             baseOptions, newBbtOptions, &tableOptions);
1137     if (!status.ok()) {
1138         throw std::invalid_argument(
1139                 std::string("RocksDBKVStore::applyUserCFOptions: "
1140                             "GetBlockBasedTableOptionsFromString error: ") +
1141                 status.getState());
1142     }
1143 
1144     // If using Partitioned Filters, then use the RocksDB recommended params
1145     // (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/filter_policy.h#L133):
1146     //     "bits_per_key: bits per key in bloom filter. A good value for
1147     //           bits_per_key is 10, which yields a filter with ~1% false
1148     //           positive rate.
1149     //       use_block_based_builder: use block based filter rather than full
1150     //           filter. If you want to build a full filter, it needs to be
1151     //           set to false."
1152     if (tableOptions.partition_filters == true) {
1153         tableOptions.filter_policy.reset(
1154                 rocksdb::NewBloomFilterPolicy(10, false));
1155     }
1156 
1157     // Always use the per-shard shared Block Cache. If it is nullptr, RocksDB
1158     // will allocate a default size Block Cache.
1159     tableOptions.block_cache = blockCache;
1160 
1161     // Set the new BlockBasedTableOptions
1162     cfOptions.table_factory.reset(
1163             rocksdb::NewBlockBasedTableFactory(tableOptions));
1164 
1165     // Set the user-provided size amplification factor if under Universal
1166     // Compaction
1167     if (cfOptions.compaction_style ==
1168         rocksdb::CompactionStyle::kCompactionStyleUniversal) {
1169         auto& configuration =
1170                 dynamic_cast<RocksDBKVStoreConfig&>(this->configuration);
1171         cfOptions.compaction_options_universal.max_size_amplification_percent =
1172                 configuration.getUCMaxSizeAmplificationPercent();
1173     }
1174 }
1175 
writeAndTimeBatch(rocksdb::WriteBatch batch)1176 rocksdb::Status RocksDBKVStore::writeAndTimeBatch(rocksdb::WriteBatch batch) {
1177     auto begin = ProcessClock::now();
1178     auto status = rdb->Write(writeOptions, &batch);
1179     st.commitHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1180             ProcessClock::now() - begin));
1181     return status;
1182 }
1183 
saveDocs( uint16_t vbid, const Item* collectionsManifest, const std::vector<std::unique_ptr<RocksRequest>>& commitBatch)1184 rocksdb::Status RocksDBKVStore::saveDocs(
1185         uint16_t vbid,
1186         const Item* collectionsManifest,
1187         const std::vector<std::unique_ptr<RocksRequest>>& commitBatch) {
1188     auto reqsSize = commitBatch.size();
1189     if (reqsSize == 0) {
1190         st.docsCommitted = 0;
1191         return rocksdb::Status::OK();
1192     }
1193 
1194     auto& vbstate = cachedVBStates[vbid];
1195     if (vbstate == nullptr) {
1196         throw std::logic_error("RocksDBKVStore::saveDocs: cachedVBStates[" +
1197                                std::to_string(vbid) + "] is NULL");
1198     }
1199 
1200     rocksdb::Status status;
1201     int64_t maxDBSeqno = 0;
1202     rocksdb::WriteBatch batch;
1203 
1204     const auto vbh = getVBHandle(vbid);
1205 
1206     for (const auto& request : commitBatch) {
1207         int64_t bySeqno = request->getDocMeta().bySeqno;
1208         maxDBSeqno = std::max(maxDBSeqno, bySeqno);
1209 
1210         status = addRequestToWriteBatch(*vbh, batch, request.get());
1211         if (!status.ok()) {
1212             logger.log(EXTENSION_LOG_WARNING,
1213                        "RocksDBKVStore::saveDocs: addRequestToWriteBatch "
1214                        "error:%d, vb:%" PRIu16,
1215                        status.code(),
1216                        vbid);
1217             return status;
1218         }
1219 
1220         // Check if we should split into a new writeBatch if the batch size
1221         // exceeds the write_buffer_size - this is necessary because we
1222         // don't want our WriteBatch to exceed the configured memtable size, as
1223         // that can cause significant memory bloating (see MB-26521).
1224         // Note the limit check is only approximate, as the batch contains
1225         // updates for at least 2 CFs (key & seqno) which will be written into
1226         // separate memtables, so we don't exactly know the size contribution
1227         // to each memtable in the batch.
1228         const auto batchLimit = defaultCFOptions.write_buffer_size +
1229                                 seqnoCFOptions.write_buffer_size;
1230         if (batch.GetDataSize() > batchLimit) {
1231             status = writeAndTimeBatch(batch);
1232             if (!status.ok()) {
1233                 logger.log(EXTENSION_LOG_WARNING,
1234                            "RocksDBKVStore::saveDocs: rocksdb::DB::Write "
1235                            "error:%d, "
1236                            "vb:%" PRIu16,
1237                            status.code(),
1238                            vbid);
1239                 return status;
1240             }
1241             batch.Clear();
1242         }
1243     }
1244 
1245     status = saveVBStateToBatch(*vbh, *vbstate, batch);
1246     if (!status.ok()) {
1247         logger.log(EXTENSION_LOG_WARNING,
1248                    "RocksDBKVStore::saveDocs: saveVBStateToBatch error:%d",
1249                    status.code());
1250         return status;
1251     }
1252 
1253     status = writeAndTimeBatch(batch);
1254     if (!status.ok()) {
1255         logger.log(EXTENSION_LOG_WARNING,
1256                    "RocksDBKVStore::saveDocs: rocksdb::DB::Write error:%d, "
1257                    "vb:%" PRIu16,
1258                    status.code(),
1259                    vbid);
1260         return status;
1261     }
1262 
1263     st.batchSize.add(reqsSize);
1264     st.docsCommitted = reqsSize;
1265 
1266     // Update high seqno
1267     vbstate->highSeqno = maxDBSeqno;
1268 
1269     return rocksdb::Status::OK();
1270 }
1271 
addRequestToWriteBatch( const VBHandle& vbh, rocksdb::WriteBatch& batch, RocksRequest* request)1272 rocksdb::Status RocksDBKVStore::addRequestToWriteBatch(
1273         const VBHandle& vbh,
1274         rocksdb::WriteBatch& batch,
1275         RocksRequest* request) {
1276     uint16_t vbid = request->getVBucketId();
1277 
1278     rocksdb::Slice keySlice = getKeySlice(request->getKey());
1279     rocksdb::SliceParts keySliceParts(&keySlice, 1);
1280 
1281     rocksdb::Slice docSlices[] = {request->getDocMetaSlice(),
1282                                   request->getDocBodySlice()};
1283     rocksdb::SliceParts valueSliceParts(docSlices, 2);
1284 
1285     rocksdb::Slice bySeqnoSlice = getSeqnoSlice(&request->getDocMeta().bySeqno);
1286     // We use the `saveDocsHisto` to track the time spent on
1287     // `rocksdb::WriteBatch::Put()`.
1288     auto begin = ProcessClock::now();
1289     auto status =
1290             batch.Put(vbh.defaultCFH.get(), keySliceParts, valueSliceParts);
1291     if (!status.ok()) {
1292         logger.log(EXTENSION_LOG_WARNING,
1293                    "RocksDBKVStore::saveDocs: rocksdb::WriteBatch::Put "
1294                    "[ColumnFamily: \'default\']  error:%d, "
1295                    "vb:%" PRIu16,
1296                    status.code(),
1297                    vbid);
1298         return status;
1299     }
1300     status = batch.Put(vbh.seqnoCFH.get(), bySeqnoSlice, keySlice);
1301     if (!status.ok()) {
1302         logger.log(EXTENSION_LOG_WARNING,
1303                    "RocksDBKVStore::saveDocs: rocksdb::WriteBatch::Put "
1304                    "[ColumnFamily: \'seqno\']  error:%d, "
1305                    "vb:%" PRIu16,
1306                    status.code(),
1307                    vbid);
1308         return status;
1309     }
1310     st.saveDocsHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1311             ProcessClock::now() - begin));
1312 
1313     return rocksdb::Status::OK();
1314 }
1315 
readHighSeqnoFromDisk(const VBHandle& vbh)1316 int64_t RocksDBKVStore::readHighSeqnoFromDisk(const VBHandle& vbh) {
1317     std::unique_ptr<rocksdb::Iterator> it(
1318             rdb->NewIterator(rocksdb::ReadOptions(), vbh.seqnoCFH.get()));
1319 
1320     // Seek to the highest seqno=>key mapping stored for the vbid
1321     auto maxSeqno = std::numeric_limits<int64_t>::max();
1322     rocksdb::Slice maxSeqnoSlice = getSeqnoSlice(&maxSeqno);
1323     it->SeekForPrev(maxSeqnoSlice);
1324 
1325     if (!it->Valid()) {
1326         return 0;
1327     }
1328     auto highSeqno = getNumericSeqno(it->key());
1329     // We use a negative seqno as key for VBState. Do not consider it.
1330     return highSeqno >= 0 ? highSeqno : 0;
1331 }
1332 
getVbstateKey()1333 int64_t RocksDBKVStore::getVbstateKey() {
1334     // We put the VBState into the SeqnoCF. As items in the SeqnoCF are ordered
1335     // by increasing-seqno, we reserve a negative special key to VBState so
1336     // that we can access it in O(1).
1337     return -9999;
1338 }
1339 
initScanContext( std::shared_ptr<StatusCallback<GetValue>> cb, std::shared_ptr<StatusCallback<CacheLookup>> cl, uint16_t vbid, uint64_t startSeqno, DocumentFilter options, ValueFilter valOptions)1340 ScanContext* RocksDBKVStore::initScanContext(
1341         std::shared_ptr<StatusCallback<GetValue>> cb,
1342         std::shared_ptr<StatusCallback<CacheLookup>> cl,
1343         uint16_t vbid,
1344         uint64_t startSeqno,
1345         DocumentFilter options,
1346         ValueFilter valOptions) {
1347     size_t scanId = scanCounter++;
1348 
1349     {
1350         std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1351         scanSnapshots.emplace(scanId, SnapshotPtr(rdb->GetSnapshot(), *rdb));
1352     }
1353 
1354     // As we cannot efficiently determine how many documents this scan will
1355     // find, we approximate this value with the seqno difference + 1
1356     // as scan is supposed to be inclusive at both ends,
1357     // seqnos 2 to 4 covers 3 docs not 4 - 2 = 2
1358 
1359     uint64_t endSeqno = cachedVBStates[vbid]->highSeqno;
1360     return new ScanContext(cb,
1361                            cl,
1362                            vbid,
1363                            scanId,
1364                            startSeqno,
1365                            endSeqno,
1366                            0, /*TODO RDB: pass the real purge-seqno*/
1367                            options,
1368                            valOptions,
1369                            /* documentCount */ endSeqno - startSeqno + 1,
1370                            configuration);
1371 }
1372 
scan(ScanContext* ctx)1373 scan_error_t RocksDBKVStore::scan(ScanContext* ctx) {
1374     if (!ctx) {
1375         return scan_failed;
1376     }
1377 
1378     if (ctx->lastReadSeqno == ctx->maxSeqno) {
1379         return scan_success;
1380     }
1381 
1382     auto startSeqno = ctx->startSeqno;
1383     if (ctx->lastReadSeqno != 0) {
1384         startSeqno = ctx->lastReadSeqno + 1;
1385     }
1386 
1387     TRACE_EVENT2("RocksDBKVStore",
1388                  "scan",
1389                  "vbid",
1390                  ctx->vbid,
1391                  "startSeqno",
1392                  startSeqno);
1393 
1394     GetMetaOnly isMetaOnly = ctx->valFilter == ValueFilter::KEYS_ONLY
1395                                      ? GetMetaOnly::Yes
1396                                      : GetMetaOnly::No;
1397 
1398     rocksdb::ReadOptions snapshotOpts{rocksdb::ReadOptions()};
1399 
1400     // Lock for safe access to the scanSnapshots map and to ensure the snapshot
1401     // doesn't get destroyed whilst we have the pointer.
1402     // @todo use a shared_ptr and reduce the lock scope to just the map::at call
1403     std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1404     snapshotOpts.snapshot = scanSnapshots.at(ctx->scanId).get();
1405 
1406     rocksdb::Slice startSeqnoSlice = getSeqnoSlice(&startSeqno);
1407     const auto vbh = getVBHandle(ctx->vbid);
1408     std::unique_ptr<rocksdb::Iterator> it(
1409             rdb->NewIterator(snapshotOpts, vbh->seqnoCFH.get()));
1410     if (!it) {
1411         throw std::logic_error(
1412                 "RocksDBKVStore::scan: rocksdb::Iterator to Seqno Column "
1413                 "Family is nullptr");
1414     }
1415     it->Seek(startSeqnoSlice);
1416 
1417     rocksdb::Slice endSeqnoSlice = getSeqnoSlice(&ctx->maxSeqno);
1418     auto isPastEnd = [&endSeqnoSlice, this](rocksdb::Slice seqSlice) {
1419         return seqnoComparator.Compare(seqSlice, endSeqnoSlice) == 1;
1420     };
1421 
1422     for (; it->Valid() && !isPastEnd(it->key()); it->Next()) {
1423         scanTotalSeqnoHits++;
1424         auto seqno = getNumericSeqno(it->key());
1425         rocksdb::Slice keySlice = it->value();
1426         std::string valueStr;
1427         auto s = rdb->Get(
1428                 snapshotOpts, vbh->defaultCFH.get(), keySlice, &valueStr);
1429 
1430         if (!s.ok()) {
1431             // TODO RDB: Old seqnos are never removed from the db!
1432             // If the item does not exist (s.isNotFound())
1433             // the seqno => key mapping could be removed; not even
1434             // a tombstone remains of that item.
1435 
1436             // Note: I account also the hits for deleted documents because it
1437             // is logically correct. But, we switch on the RocksDB built-in
1438             // Bloom Filter by default and we try to keep all the Filter blocks
1439             // in the BlockCache. So, I expect that the impact of old-seqno hits
1440             // is minimum in this case.
1441             scanOldSeqnoHits++;
1442 
1443             continue;
1444         }
1445 
1446         rocksdb::Slice valSlice(valueStr);
1447 
1448         // TODO RDB: Deal with collections
1449         DocKey key(reinterpret_cast<const uint8_t*>(keySlice.data()),
1450                    keySlice.size(),
1451                    DocNamespace::DefaultCollection);
1452 
1453         std::unique_ptr<Item> itm =
1454                 makeItem(ctx->vbid, key, valSlice, isMetaOnly);
1455 
1456         if (itm->getBySeqno() > seqno) {
1457             // TODO RDB: Old seqnos are never removed from the db!
1458             // If the item has a newer seqno now, the stale
1459             // seqno => key mapping could be removed
1460             scanOldSeqnoHits++;
1461             continue;
1462         } else if (itm->getBySeqno() < seqno) {
1463             throw std::logic_error(
1464                     "RocksDBKVStore::scan: index has a higher seqno"
1465                     "than the document in a snapshot!");
1466         }
1467 
1468         bool includeDeletes =
1469                 (ctx->docFilter == DocumentFilter::NO_DELETES) ? false : true;
1470         bool onlyKeys =
1471                 (ctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false;
1472 
1473         if (!includeDeletes && itm->isDeleted()) {
1474             continue;
1475         }
1476         int64_t byseqno = itm->getBySeqno();
1477         CacheLookup lookup(key,
1478                            byseqno,
1479                            ctx->vbid,
1480                            ctx->collectionsContext.getSeparator());
1481         ctx->lookup->callback(lookup);
1482 
1483         int status = ctx->lookup->getStatus();
1484 
1485         if (status == ENGINE_KEY_EEXISTS) {
1486             ctx->lastReadSeqno = byseqno;
1487             continue;
1488         } else if (status == ENGINE_ENOMEM) {
1489             return scan_again;
1490         }
1491 
1492         GetValue rv(std::move(itm), ENGINE_SUCCESS, -1, onlyKeys);
1493         ctx->callback->callback(rv);
1494         status = ctx->callback->getStatus();
1495 
1496         if (status == ENGINE_ENOMEM) {
1497             return scan_again;
1498         }
1499 
1500         ctx->lastReadSeqno = byseqno;
1501     }
1502 
1503     cb_assert(it->status().ok()); // Check for any errors found during the scan
1504 
1505     return scan_success;
1506 }
1507 
destroyScanContext(ScanContext* ctx)1508 void RocksDBKVStore::destroyScanContext(ScanContext* ctx) {
1509     if (ctx == nullptr) {
1510         return;
1511     }
1512     std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1513     // TODO RDB: Might be nice to have the snapshot in the ctx and
1514     // release it on destruction
1515     auto it = scanSnapshots.find(ctx->scanId);
1516     if (it != scanSnapshots.end()) {
1517         scanSnapshots.erase(it);
1518     }
1519     delete ctx;
1520 }
1521 
getStatFromMemUsage( const rocksdb::MemoryUtil::UsageType type, size_t& value)1522 bool RocksDBKVStore::getStatFromMemUsage(
1523         const rocksdb::MemoryUtil::UsageType type, size_t& value) {
1524     std::vector<rocksdb::DB*> dbs = {rdb.get()};
1525     auto cache_set = getCachePointers();
1526     std::map<rocksdb::MemoryUtil::UsageType, uint64_t> usageByType;
1527 
1528     auto status = rocksdb::MemoryUtil::GetApproximateMemoryUsageByType(
1529             dbs, cache_set, &usageByType);
1530     if (!status.ok()) {
1531         logger.log(EXTENSION_LOG_NOTICE,
1532                    "RocksDBKVStore::getStatFromMemUsage: "
1533                    "GetApproximateMemoryUsageByType error: %s",
1534                    status.getState());
1535         return false;
1536     }
1537 
1538     value = usageByType.at(type);
1539 
1540     return true;
1541 }
1542 
getStatFromStatistics(const rocksdb::Tickers ticker, size_t& value)1543 bool RocksDBKVStore::getStatFromStatistics(const rocksdb::Tickers ticker,
1544                                            size_t& value) {
1545     const auto statistics = rdb->GetDBOptions().statistics;
1546     if (!statistics) {
1547         return false;
1548     }
1549     value = statistics->getTickerCount(ticker);
1550     return true;
1551 }
1552 
getStatFromProperties(ColumnFamily cf, const std::string& property, size_t& value)1553 bool RocksDBKVStore::getStatFromProperties(ColumnFamily cf,
1554                                            const std::string& property,
1555                                            size_t& value) {
1556     value = 0;
1557     std::lock_guard<std::mutex> lg(vbhMutex);
1558     for (const auto vbh : vbHandles) {
1559         if (vbh) {
1560             rocksdb::ColumnFamilyHandle* cfh = nullptr;
1561             switch (cf) {
1562             case ColumnFamily::Default:
1563                 cfh = vbh->defaultCFH.get();
1564                 break;
1565             case ColumnFamily::Seqno:
1566                 cfh = vbh->seqnoCFH.get();
1567                 break;
1568             }
1569             if (!cfh) {
1570                 return false;
1571             }
1572             std::string out;
1573             if (!rdb->GetProperty(cfh, property, &out)) {
1574                 return false;
1575             }
1576             value += std::stoull(out);
1577         }
1578     }
1579 
1580     return true;
1581 }
1582 
1583 // As we implement a VBucket as a pair of two Column Families (a 'default' CF
1584 // and a 'local+seqno' CF), we need to re-set the 'write_buffer_size' for each
1585 // CF when the number of VBuckets managed by the current store changes. The
1586 // goal is to keep the total allocation for all the Memtables under the
1587 // 'rocksdb_memtables_ratio' given in configuration.
1588 // Thus, this function performs the following basic steps:
1589 //     1) Re-calculate the new sizes of all Memtables;
1590 //     2) Apply the new sizes.
1591 // We apply the new sizes using the rocksdb::DB::SetOptions() API. The
1592 // 'write_buffer_size' is a dynamically changeable option. This call changes
1593 // the size of mutable Memtables instantly. If the new size is below the
1594 // current allocation for the Memtable, the next key-value pair added will mark
1595 // the Memtable as immutable and will trigger a flush.
applyMemtablesQuota( const std::lock_guard<std::mutex>& lock)1596 void RocksDBKVStore::applyMemtablesQuota(
1597         const std::lock_guard<std::mutex>& lock) {
1598     const auto vbuckets = getVBucketsCount(lock);
1599 
1600     auto& configuration =
1601             dynamic_cast<RocksDBKVStoreConfig&>(this->configuration);
1602 
1603     // 1) If configuration.getMemtablesRatio() == 0.0, then
1604     //      we just want to use the baseline write_buffer_size.
1605     // 2) If vbuckets == 0, then there is no Memtable (this happens only
1606     //      when the underlying RocksDB instance has just been created).
1607     // On both cases the following logic does not apply, so the
1608     // write_buffer_size for both the 'default' and the 'seqno' CFs is left
1609     // to the baseline value.
1610     if (configuration.getMemtablesRatio() > 0.0 && vbuckets > 0) {
1611         const auto memtablesQuota = configuration.getBucketQuota() /
1612                                     configuration.getMaxShards() *
1613                                     configuration.getMemtablesRatio();
1614         // TODO: for now I am hard-coding the percentage of Memtables Quota
1615         // that we allocate for the 'deafult' (90%) and 'seqno' (10%) CFs. The
1616         // plan is to expose this percentage as a configuration parameter in a
1617         // follow-up patch.
1618         const auto defaultCFMemtablesQuota = memtablesQuota * 0.9;
1619         const auto seqnoCFMemtablesQuota =
1620                 memtablesQuota - defaultCFMemtablesQuota;
1621 
1622         // Set the the write_buffer_size for the 'default' CF
1623         defaultCFOptions.write_buffer_size =
1624                 defaultCFMemtablesQuota / vbuckets /
1625                 defaultCFOptions.max_write_buffer_number;
1626         // Set the write_buffer_size for the 'seqno' CF
1627         seqnoCFOptions.write_buffer_size =
1628                 seqnoCFMemtablesQuota / vbuckets /
1629                 seqnoCFOptions.max_write_buffer_number;
1630 
1631         // Apply the new write_buffer_size
1632         const std::unordered_map<std::string, std::string>
1633                 newDefaultCFWriteBufferSize{std::make_pair(
1634                         "write_buffer_size",
1635                         std::to_string(defaultCFOptions.write_buffer_size))};
1636         const std::unordered_map<std::string, std::string>
1637                 newSeqnoCFWriteBufferSize{std::make_pair(
1638                         "write_buffer_size",
1639                         std::to_string(seqnoCFOptions.write_buffer_size))};
1640         for (const auto& vbh : vbHandles) {
1641             if (vbh) {
1642                 auto status = rdb->SetOptions(vbh->defaultCFH.get(),
1643                                               newDefaultCFWriteBufferSize);
1644                 if (!status.ok()) {
1645                     throw std::runtime_error(
1646                             "RocksDBKVStore::applyMemtablesQuota: SetOptions "
1647                             "failed for [vbid: " +
1648                             std::to_string(vbh->vbid) + ", CF: default]: " +
1649                             status.getState());
1650                 }
1651                 status = rdb->SetOptions(vbh->seqnoCFH.get(),
1652                                          newSeqnoCFWriteBufferSize);
1653                 if (!status.ok()) {
1654                     throw std::runtime_error(
1655                             "RocksDBKVStore::applyMemtablesQuota: SetOptions "
1656                             "failed for [vbid: " +
1657                             std::to_string(vbh->vbid) + ", CF: seqno]: " +
1658                             status.getState());
1659                 }
1660             }
1661         }
1662     }
1663 
1664     // Overwrite Compaction options if Compaction Optimization is enabled
1665     // for the 'default' CF
1666     if (configuration.getDefaultCfOptimizeCompaction() == "level") {
1667         defaultCFOptions.OptimizeLevelStyleCompaction(
1668                 defaultCFOptions.write_buffer_size);
1669     } else if (configuration.getDefaultCfOptimizeCompaction() == "universal") {
1670         defaultCFOptions.OptimizeUniversalStyleCompaction(
1671                 defaultCFOptions.write_buffer_size);
1672     }
1673     // Overwrite Compaction options if Compaction Optimization is enabled
1674     // for the 'seqno' CF
1675     if (configuration.getSeqnoCfOptimizeCompaction() == "level") {
1676         seqnoCFOptions.OptimizeLevelStyleCompaction(
1677                 seqnoCFOptions.write_buffer_size);
1678     } else if (configuration.getSeqnoCfOptimizeCompaction() == "universal") {
1679         seqnoCFOptions.OptimizeUniversalStyleCompaction(
1680                 seqnoCFOptions.write_buffer_size);
1681     }
1682 }
1683 
getVBucketsCount( const std::lock_guard<std::mutex>&) const1684 size_t RocksDBKVStore::getVBucketsCount(
1685         const std::lock_guard<std::mutex>&) const {
1686     uint16_t count = 0;
1687     for (const auto& vbh : vbHandles) {
1688         if (vbh) {
1689             count++;
1690         }
1691     }
1692     return count;
1693 }
1694