1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2017 Couchbase, Inc.
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include "rocksdb-kvstore.h"
21#include "rocksdb-kvstore_config.h"
22
23#include "ep_time.h"
24
25#include "kvstore_priv.h"
26
27#include <phosphor/phosphor.h>
28#include <platform/sysinfo.h>
29#include <rocksdb/convenience.h>
30#include <rocksdb/filter_policy.h>
31
32#include <stdio.h>
33#include <string.h>
34#include <algorithm>
35#include <gsl/gsl>
36#include <limits>
37#include <thread>
38
39#include "vbucket.h"
40
41namespace rockskv {
42// MetaData is used to serialize and de-serialize metadata respectively when
43// writing a Document mutation request to RocksDB and when reading a Document
44// from RocksDB.
45class MetaData {
46public:
47    MetaData()
48        : deleted(0),
49          version(0),
50          datatype(0),
51          flags(0),
52          valueSize(0),
53          exptime(0),
54          cas(0),
55          revSeqno(0),
56          bySeqno(0){};
57    MetaData(bool deleted,
58             uint8_t version,
59             uint8_t datatype,
60             uint32_t flags,
61             uint32_t valueSize,
62             time_t exptime,
63             uint64_t cas,
64             uint64_t revSeqno,
65             int64_t bySeqno)
66        : deleted(deleted),
67          version(version),
68          datatype(datatype),
69          flags(flags),
70          valueSize(valueSize),
71          exptime(exptime),
72          cas(cas),
73          revSeqno(revSeqno),
74          bySeqno(bySeqno){};
75
76// The `#pragma pack(1)` directive and the order of members are to keep
77// the size of MetaData as small as possible and uniform across different
78// platforms.
79#pragma pack(1)
80    uint8_t deleted : 1;
81    uint8_t version : 7;
82    uint8_t datatype;
83    uint32_t flags;
84    uint32_t valueSize;
85    time_t exptime;
86    uint64_t cas;
87    uint64_t revSeqno;
88    int64_t bySeqno;
89#pragma pack()
90};
91} // namespace rockskv
92
93/**
94 * Class representing a document to be persisted in RocksDB.
95 */
96class RocksRequest : public IORequest {
97public:
98    /**
99     * Constructor
100     *
101     * @param item Item instance to be persisted
102     * @param callback Persistence Callback
103     * @param del Flag indicating if it is an item deletion or not
104     */
105    RocksRequest(const Item& item, MutationRequestCallback& callback)
106        : IORequest(item.getVBucketId(),
107                    callback,
108                    item.isDeleted(),
109                    item.getKey()),
110          docBody(item.getValue()) {
111        docMeta = rockskv::MetaData(
112                item.isDeleted(),
113                0,
114                item.getDataType(),
115                item.getFlags(),
116                item.getNBytes(),
117                item.isDeleted() ? ep_real_time() : item.getExptime(),
118                item.getCas(),
119                item.getRevSeqno(),
120                item.getBySeqno());
121    }
122
123    const rockskv::MetaData& getDocMeta() {
124        return docMeta;
125    }
126
127    // Get a rocksdb::Slice wrapping the Document MetaData
128    rocksdb::Slice getDocMetaSlice() {
129        return rocksdb::Slice(reinterpret_cast<char*>(&docMeta),
130                              sizeof(docMeta));
131    }
132
133    // Get a rocksdb::Slice wrapping the Document Body
134    rocksdb::Slice getDocBodySlice() {
135        const char* data = docBody ? docBody->getData() : nullptr;
136        size_t size = docBody ? docBody->valueSize() : 0;
137        return rocksdb::Slice(data, size);
138    }
139
140private:
141    rockskv::MetaData docMeta;
142    value_t docBody;
143};
144
145// RocksDB docs suggest to "Use `rocksdb::DB::DestroyColumnFamilyHandle()` to
146// close a column family instead of deleting the column family handle directly"
147struct ColumnFamilyDeleter {
148    ColumnFamilyDeleter(rocksdb::DB& db) : db(db) {
149    }
150    void operator()(rocksdb::ColumnFamilyHandle* cfh) {
151        db.DestroyColumnFamilyHandle(cfh);
152    }
153
154private:
155    rocksdb::DB& db;
156};
157using ColumnFamilyPtr =
158        std::unique_ptr<rocksdb::ColumnFamilyHandle, ColumnFamilyDeleter>;
159
160// The 'VBHandle' class is a wrapper around the ColumnFamilyHandles
161// for a VBucket.
162class VBHandle {
163public:
164    VBHandle(rocksdb::DB& rdb,
165             rocksdb::ColumnFamilyHandle* defaultCFH,
166             rocksdb::ColumnFamilyHandle* seqnoCFH,
167             uint16_t vbid)
168        : rdb(rdb),
169          defaultCFH(ColumnFamilyPtr(defaultCFH, rdb)),
170          seqnoCFH(ColumnFamilyPtr(seqnoCFH, rdb)),
171          vbid(vbid) {
172    }
173
174    void dropColumnFamilies() {
175        // The call to DropColumnFamily() records the drop in the Manifest, but
176        // the actual remove will happen when the ColumFamilyHandle is deleted.
177        auto status = rdb.DropColumnFamily(defaultCFH.get());
178        if (!status.ok()) {
179            throw std::runtime_error(
180                    "VBHandle::dropColumnFamilies: DropColumnFamily failed for "
181                    "[vbid: " +
182                    std::to_string(vbid) + ", CF: default]: " +
183                    status.getState());
184        }
185        status = rdb.DropColumnFamily(seqnoCFH.get());
186        if (!status.ok()) {
187            throw std::runtime_error(
188                    "VBHandle::dropColumnFamilies: DropColumnFamily failed for "
189                    "[vbid: " +
190                    std::to_string(vbid) + ", CF: seqno]: " +
191                    status.getState());
192        }
193    }
194
195    rocksdb::DB& rdb;
196    const ColumnFamilyPtr defaultCFH;
197    const ColumnFamilyPtr seqnoCFH;
198    const uint16_t vbid;
199};
200
201RocksDBKVStore::RocksDBKVStore(RocksDBKVStoreConfig& configuration)
202    : KVStore(configuration),
203      vbHandles(configuration.getMaxVBuckets()),
204      in_transaction(false),
205      scanCounter(0),
206      logger(configuration.getLogger()) {
207    cachedVBStates.resize(configuration.getMaxVBuckets());
208    writeOptions.sync = true;
209
210    // The RocksDB Options is a set of DBOptions and ColumnFamilyOptions.
211    // Together they cover all RocksDB available parameters.
212    auto status = rocksdb::GetDBOptionsFromString(
213            dbOptions, configuration.getDBOptions(), &dbOptions);
214    if (!status.ok()) {
215        throw std::invalid_argument(
216                std::string("RocksDBKVStore::open: GetDBOptionsFromString "
217                            "error: ") +
218                status.getState());
219    }
220
221    // Set number of background threads - note these are per-environment, so
222    // are shared across all DB instances (vBuckets) and all Buckets.
223    auto lowPri = configuration.getLowPriBackgroundThreads();
224    if (lowPri == 0) {
225        lowPri = cb::get_available_cpu_count();
226    }
227    rocksdb::Env::Default()->SetBackgroundThreads(lowPri, rocksdb::Env::LOW);
228
229    auto highPri = configuration.getHighPriBackgroundThreads();
230    if (highPri == 0) {
231        highPri = cb::get_available_cpu_count();
232    }
233    rocksdb::Env::Default()->SetBackgroundThreads(highPri, rocksdb::Env::HIGH);
234
235    dbOptions.create_if_missing = true;
236
237    // We use EventListener to set the correct ThreadLocal engine in the
238    // ObjectRegistry for the RocksDB Flusher and Compactor threads. This
239    // allows the memory tracker to track allocations and deallocations against
240    // the appropriate bucket.
241    auto eventListener =
242            std::make_shared<EventListener>(ObjectRegistry::getCurrentEngine());
243    dbOptions.listeners.emplace_back(eventListener);
244
245    // Enable Statistics if 'Statistics::stat_level_' is provided by the
246    // configuration. We create a statistics object and pass to the multiple
247    // DBs managed by the same KVStore. Then the statistics object will contain
248    // aggregated values for all those DBs. Note that some stats are undefined
249    // and have no meaningful information across multiple DBs (e.g.,
250    // "rocksdb.sequence.number").
251    if (!configuration.getStatsLevel().empty()) {
252        dbOptions.statistics = rocksdb::CreateDBStatistics();
253        dbOptions.statistics->stats_level_ =
254                getStatsLevel(configuration.getStatsLevel());
255    }
256
257    // Apply the environment rate limit for Flush and Compaction
258    dbOptions.rate_limiter = configuration.getEnvRateLimiter();
259
260    // Allocate the per-shard Block Cache
261    if (configuration.getBlockCacheRatio() > 0.0) {
262        auto blockCacheQuota = configuration.getBucketQuota() *
263                               configuration.getBlockCacheRatio();
264        // Keeping default settings for:
265        // num_shard_bits = -1 (automatically determined)
266        // strict_capacity_limit = false (do not fail insert when cache is full)
267        blockCache = rocksdb::NewLRUCache(
268                blockCacheQuota / configuration.getMaxShards(),
269                -1 /*num_shard_bits*/,
270                false /*strict_capacity_limit*/,
271                configuration.getBlockCacheHighPriPoolRatio());
272    }
273    // Configure all the Column Families
274    const auto& cfOptions = configuration.getCFOptions();
275    const auto& bbtOptions = configuration.getBBTOptions();
276    defaultCFOptions = getBaselineDefaultCFOptions();
277    seqnoCFOptions = getBaselineSeqnoCFOptions();
278    applyUserCFOptions(defaultCFOptions, cfOptions, bbtOptions);
279    applyUserCFOptions(seqnoCFOptions, cfOptions, bbtOptions);
280
281    // Open the DB and load the ColumnFamilyHandle for all the
282    // existing Column Families (populates the 'vbHandles' vector)
283    openDB();
284
285    // Calculate and apply the correct write_buffer_size for all Column
286    // Families. The Memtable size of each CF depends on the count of existing
287    // CFs in DB (besides other things). Thus, this must be called after
288    // 'openDB' (so that all the existing CFs have been loaded).
289    applyMemtablesQuota(std::lock_guard<std::mutex>(vbhMutex));
290
291    // Read persisted VBs state
292    for (const auto vbh : vbHandles) {
293        if (vbh) {
294            readVBState(*vbh);
295            // Update stats
296            ++st.numLoadedVb;
297        }
298    }
299}
300
301RocksDBKVStore::~RocksDBKVStore() {
302    // Guarantees that all the ColumnFamilyHandles for the existing VBuckets
303    // are released before 'rdb' is deleted. From RocksDB docs:
304    //     "Before delete DB, you have to close All column families by calling
305    //      DestroyColumnFamilyHandle() with all the handles."
306    vbHandles.clear();
307    in_transaction = false;
308}
309
310void RocksDBKVStore::openDB() {
311    auto dbname = getDBSubdir();
312    createDataDir(dbname);
313
314    std::vector<std::string> cfs;
315    auto status = rocksdb::DB::ListColumnFamilies(dbOptions, dbname, &cfs);
316    if (!status.ok()) {
317        // If ListColumnFamilies failed because the DB does not exist,
318        // then it will be created the first time we call 'rocksdb::DB::Open'.
319        // Else, we throw an error if ListColumnFamilies failed for any other
320        // unexpected reason.
321        if (!(status.code() == rocksdb::Status::kIOError &&
322              std::string(status.getState())
323                              .find("No such file or directory") !=
324                      std::string::npos)) {
325            throw std::runtime_error(
326                    "RocksDBKVStore::openDB: ListColumnFamilies failed for DB "
327                    "'" +
328                    dbname + "': " + status.getState());
329        }
330    }
331
332    // We need to pass a ColumnFamilyDescriptor for every existing CF.
333    // We populate 'cfDescriptors' so that it results in a vector
334    // containing packed CFs for every VBuckets, e.g. with
335    // MaxShards=4:
336    //     cfDescriptors[0] = default_0
337    //     cfDescriptors[1] = seqno_0
338    //     cfDescriptors[2] = default_4
339    //     cfDescriptors[3] = seqno_4
340    //     ..
341    // That helps us in populating 'vbHandles' later, because after
342    // 'rocksdb::DB::Open' handles[i] will be the handle that we will use
343    // to operate on the ColumnFamily at cfDescriptors[i].
344    std::vector<rocksdb::ColumnFamilyDescriptor> cfDescriptors;
345    for (uint16_t vbid = 0; vbid < configuration.getMaxVBuckets(); vbid++) {
346        if ((vbid % configuration.getMaxShards()) ==
347            configuration.getShardId()) {
348            std::string defaultCF = "default_" + std::to_string(vbid);
349            std::string seqnoCF = "local+seqno_" + std::to_string(vbid);
350            if (std::find(cfs.begin(), cfs.end(), defaultCF) != cfs.end()) {
351                if (std::find(cfs.begin(), cfs.end(), seqnoCF) == cfs.end()) {
352                    throw std::logic_error("RocksDBKVStore::openDB: DB '" +
353                                           dbname +
354                                           "' is in inconsistent state: CF " +
355                                           seqnoCF + " not found.");
356                }
357                cfDescriptors.emplace_back(defaultCF, defaultCFOptions);
358                cfDescriptors.emplace_back(seqnoCF, seqnoCFOptions);
359            }
360        }
361    }
362
363    // TODO: The RocksDB built-in 'default' CF always exists, need to check if
364    // we can drop it.
365    cfDescriptors.emplace_back(rocksdb::kDefaultColumnFamilyName,
366                               rocksdb::ColumnFamilyOptions());
367    std::vector<rocksdb::ColumnFamilyHandle*> handles;
368    rocksdb::DB* db;
369    status = rocksdb::DB::Open(dbOptions, dbname, cfDescriptors, &handles, &db);
370    if (!status.ok()) {
371        throw std::runtime_error(
372                "RocksDBKVStore::openDB: Open failed for database '" + dbname +
373                "': " + status.getState());
374    }
375    rdb.reset(db);
376
377    // The way we populated 'cfDescriptors' guarantees that: if 'cfDescriptors'
378    // contains more than only the RocksDB 'default' CF (i.e.,
379    // '(cfDescriptors.size() - 1) > 0') then 'cfDescriptors[i]' and
380    // 'cfDescriptors[i+1]' are respectively the 'default_' and 'seqno'_ CFs
381    // for a certain VBucket.
382    for (uint16_t i = 0; i < (cfDescriptors.size() - 1); i += 2) {
383        // Note: any further sanity-check is redundant as we will have always
384        // 'cf = "default_<vbid>"'.
385        const auto& cf = cfDescriptors[i].name;
386        uint16_t vbid = std::stoi(cf.substr(8));
387        vbHandles[vbid] = std::make_shared<VBHandle>(
388                *rdb, handles[i], handles[i + 1], vbid);
389    }
390
391    // We need to release the ColumnFamilyHandle for the built-in 'default' CF
392    // here, as it is not managed by any VBHandle.
393    rdb->DestroyColumnFamilyHandle(handles.back());
394}
395
396std::shared_ptr<VBHandle> RocksDBKVStore::getVBHandle(uint16_t vbid) {
397    std::lock_guard<std::mutex> lg(vbhMutex);
398    if (vbHandles[vbid]) {
399        return vbHandles[vbid];
400    }
401
402    // If the VBHandle for vbid does not exist it means that we need to create
403    // the VBucket, i.e. we need to create the set of CFs on DB for vbid
404    std::vector<rocksdb::ColumnFamilyDescriptor> cfDescriptors;
405    auto vbid_ = std::to_string(vbid);
406    cfDescriptors.emplace_back("default_" + vbid_, defaultCFOptions);
407    cfDescriptors.emplace_back("local+seqno_" + vbid_, seqnoCFOptions);
408
409    std::vector<rocksdb::ColumnFamilyHandle*> handles;
410    auto status = rdb->CreateColumnFamilies(cfDescriptors, &handles);
411    if (!status.ok()) {
412        for (auto* cfh : handles) {
413            status = rdb->DropColumnFamily(cfh);
414            if (!status.ok()) {
415                throw std::runtime_error(
416                        "RocksDBKVStore::getVBHandle: DropColumnFamily failed "
417                        "for CF " +
418                        cfh->GetName() + ": " + status.getState());
419            }
420        }
421        throw std::runtime_error(
422                "RocksDBKVStore::getVBHandle: CreateColumnFamilies failed for "
423                "vbid " +
424                std::to_string(vbid) + ": " + status.getState());
425    }
426
427    vbHandles[vbid] =
428            std::make_shared<VBHandle>(*rdb, handles[0], handles[1], vbid);
429
430    // The number of VBuckets has increased, we need to re-balance the
431    // Memtables Quota among the CFs of existing VBuckets.
432    applyMemtablesQuota(lg);
433
434    return vbHandles[vbid];
435}
436
437std::string RocksDBKVStore::getDBSubdir() {
438    return configuration.getDBName() + "/rocksdb." +
439           std::to_string(configuration.getShardId());
440}
441
442bool RocksDBKVStore::begin(std::unique_ptr<TransactionContext> txCtx) {
443    if (!txCtx) {
444        throw std::invalid_argument("RocksDBKVStore::begin: txCtx is null");
445    }
446    in_transaction = true;
447    transactionCtx = std::move(txCtx);
448    return in_transaction;
449}
450
451bool RocksDBKVStore::commit(const Item* collectionsManifest) {
452    // This behaviour is to replicate the one in Couchstore.
453    // If `commit` is called when not in transaction, just return true.
454    if (!in_transaction) {
455        return true;
456    }
457
458    if (pendingReqs.size() == 0) {
459        in_transaction = false;
460        return true;
461    }
462
463    // Swap `pendingReqs` with the temporary `commitBatch` so that we can
464    // shorten the scope of the lock.
465    std::vector<std::unique_ptr<RocksRequest>> commitBatch;
466    {
467        std::lock_guard<std::mutex> lock(writeMutex);
468        std::swap(pendingReqs, commitBatch);
469    }
470
471    bool success = true;
472    auto vbid = commitBatch[0]->getVBucketId();
473
474    // Flush all documents to disk
475    auto status = saveDocs(vbid, collectionsManifest, commitBatch);
476    if (!status.ok()) {
477        logger.log(EXTENSION_LOG_WARNING,
478                   "RocksDBKVStore::commit: saveDocs error:%d, "
479                   "vb:%" PRIu16,
480                   status.code(),
481                   vbid);
482        success = false;
483    }
484
485    commitCallback(status, commitBatch);
486
487    // This behaviour is to replicate the one in Couchstore.
488    // Set `in_transanction = false` only if `commit` is successful.
489    if (success) {
490        in_transaction = false;
491        transactionCtx.reset();
492    }
493
494    return success;
495}
496
497static int getMutationStatus(rocksdb::Status status) {
498    switch (status.code()) {
499    case rocksdb::Status::Code::kOk:
500        return MUTATION_SUCCESS;
501    case rocksdb::Status::Code::kNotFound:
502        // This return value causes ep-engine to drop the failed flush
503        return DOC_NOT_FOUND;
504    case rocksdb::Status::Code::kBusy:
505        // This return value causes ep-engine to keep re-queueing the failed
506        // flush
507        return MUTATION_FAILED;
508    default:
509        throw std::runtime_error(
510                std::string("getMutationStatus: RocksDB error:") +
511                std::string(status.getState()));
512    }
513}
514
515void RocksDBKVStore::commitCallback(
516        rocksdb::Status status,
517        const std::vector<std::unique_ptr<RocksRequest>>& commitBatch) {
518    for (const auto& request : commitBatch) {
519        auto dataSize = request->getDocMetaSlice().size() +
520                        request->getDocBodySlice().size();
521        const auto& key = request->getKey();
522        /* update ep stats */
523        ++st.io_num_write;
524        st.io_write_bytes += (key.size() + dataSize);
525
526        auto rv = getMutationStatus(status);
527        if (request->isDelete()) {
528            if (status.code()) {
529                ++st.numDelFailure;
530            } else {
531                st.delTimeHisto.add(request->getDelta() / 1000);
532            }
533            if (rv != -1) {
534                // TODO: Should set `rv` to 1 or 0 depending on if this is a
535                // delete to an existing (1) or non-existing (0) item. However,
536                // to achieve this we would need to perform a Get to RocksDB
537                // which is costly. For now just assume that the item did exist.
538                rv = 1;
539            }
540            request->getDelCallback()->callback(*transactionCtx, rv);
541        } else {
542            if (status.code()) {
543                ++st.numSetFailure;
544            } else {
545                st.writeTimeHisto.add(request->getDelta() / 1000);
546                st.writeSizeHisto.add(dataSize + key.size());
547            }
548            // TODO: Should set `mr.second` to true or false depending on if
549            // this is an insertion (true) or an update of an existing item
550            // (false). However, to achieve this we would need to perform a Get
551            // to RocksDB which is costly. For now just assume that the item
552            // did not exist.
553            mutation_result mr = std::make_pair(1, true);
554            request->getSetCallback()->callback(*transactionCtx, mr);
555        }
556    }
557}
558
559void RocksDBKVStore::rollback() {
560    if (in_transaction) {
561        in_transaction = false;
562        transactionCtx.reset();
563    }
564}
565
566std::vector<vbucket_state*> RocksDBKVStore::listPersistedVbuckets() {
567    std::vector<vbucket_state*> result;
568    for (const auto& vb : cachedVBStates) {
569        result.emplace_back(vb.get());
570    }
571    return result;
572}
573
574void RocksDBKVStore::set(const Item& item,
575                         Callback<TransactionContext, mutation_result>& cb) {
576    if (!in_transaction) {
577        throw std::logic_error(
578                "RocksDBKVStore::set: in_transaction must be true to perform a "
579                "set operation.");
580    }
581    MutationRequestCallback callback;
582    callback.setCb = &cb;
583    pendingReqs.push_back(std::make_unique<RocksRequest>(item, callback));
584}
585
586GetValue RocksDBKVStore::get(const StoredDocKey& key,
587                             uint16_t vb,
588                             bool fetchDelete) {
589    return getWithHeader(nullptr, key, vb, GetMetaOnly::No, fetchDelete);
590}
591
592GetValue RocksDBKVStore::getWithHeader(void* dbHandle,
593                                       const StoredDocKey& key,
594                                       uint16_t vb,
595                                       GetMetaOnly getMetaOnly,
596                                       bool fetchDelete) {
597    std::string value;
598    const auto vbh = getVBHandle(vb);
599    // TODO RDB: use a PinnableSlice to avoid some memcpy
600    rocksdb::Slice keySlice = getKeySlice(key);
601    rocksdb::Status s = rdb->Get(
602            rocksdb::ReadOptions(), vbh->defaultCFH.get(), keySlice, &value);
603    if (!s.ok()) {
604        return GetValue{NULL, ENGINE_KEY_ENOENT};
605    }
606    return makeGetValue(vb, key, value, getMetaOnly);
607}
608
609void RocksDBKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t& itms) {
610    // TODO RDB: RocksDB supports a multi get which we should use here.
611    for (auto& it : itms) {
612        auto& key = it.first;
613        rocksdb::Slice keySlice = getKeySlice(key);
614        std::string value;
615        const auto vbh = getVBHandle(vb);
616        rocksdb::Status s = rdb->Get(rocksdb::ReadOptions(),
617                                     vbh->defaultCFH.get(),
618                                     keySlice,
619                                     &value);
620        if (s.ok()) {
621            it.second.value =
622                    makeGetValue(vb, key, value, it.second.isMetaOnly);
623            GetValue* rv = &it.second.value;
624            for (auto& fetch : it.second.bgfetched_list) {
625                fetch->value = rv;
626            }
627        } else {
628            for (auto& fetch : it.second.bgfetched_list) {
629                fetch->value->setStatus(ENGINE_KEY_ENOENT);
630            }
631        }
632    }
633}
634
635void RocksDBKVStore::reset(uint16_t vbucketId) {
636    // TODO RDB:  Implement.
637}
638
639void RocksDBKVStore::del(const Item& item,
640                         Callback<TransactionContext, int>& cb) {
641    if (!item.isDeleted()) {
642        throw std::invalid_argument(
643                "RocksDBKVStore::del item to delete is not marked as deleted.");
644    }
645    if (!in_transaction) {
646        throw std::logic_error(
647                "RocksDBKVStore::del: in_transaction must be true to perform a "
648                "delete operation.");
649    }
650    // TODO: Deleted items remain as tombstones, but are not yet expired,
651    // they will accumuate forever.
652    MutationRequestCallback callback;
653    callback.delCb = &cb;
654    pendingReqs.push_back(std::make_unique<RocksRequest>(item, callback));
655}
656
657void RocksDBKVStore::delVBucket(uint16_t vbid, uint64_t vb_version) {
658    std::lock_guard<std::mutex> lg1(writeMutex);
659    std::lock_guard<std::mutex> lg2(vbhMutex);
660
661    if (!vbHandles[vbid]) {
662        logger.log(EXTENSION_LOG_WARNING,
663                   "RocksDBKVStore::delVBucket: VBucket not found, vb:%" PRIu16,
664                   vbid);
665        return;
666    }
667
668    // 'vbHandles' stores a shared_ptr to VBHandle for each VBucket . The
669    // ownership of each pointer is shared among multiple threads performing
670    // different operations (e.g., 'get' and 'commit').
671    // We want to call 'DropColumnFamily' here rather than in other threads
672    // because it is an expensive, IO-intensive operation and we do not want
673    // it to cause another thread (possibly a front-end one) from being blocked
674    // performing the drop.
675    // So, the thread executing 'delVBucket' spins until it is the exclusive
676    // owner of the shared_ptr (i.e., other concurrent threads like 'commit'
677    // have completed and do not own any copy of the shared_ptr).
678    {
679        std::shared_ptr<VBHandle> sharedPtr;
680        std::swap(vbHandles[vbid], sharedPtr);
681        while (!sharedPtr.unique()) {
682            std::this_thread::sleep_for(std::chrono::microseconds(100));
683        }
684        // Drop all the CF for vbid.
685        sharedPtr->dropColumnFamilies();
686    }
687
688    // The number of VBuckets has decreased, we need to re-balance the
689    // Memtables Quota among the CFs of existing VBuckets.
690    applyMemtablesQuota(lg2);
691}
692
693bool RocksDBKVStore::snapshotVBucket(uint16_t vbucketId,
694                                     const vbucket_state& vbstate,
695                                     VBStatePersist options) {
696    // TODO RDB: Refactor out behaviour common to this and CouchKVStore
697    auto start = ProcessClock::now();
698
699    if (updateCachedVBState(vbucketId, vbstate) &&
700        (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT ||
701         options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT)) {
702        const auto vbh = getVBHandle(vbucketId);
703        rocksdb::WriteBatch batch;
704        auto status = saveVBStateToBatch(*vbh, vbstate, batch);
705        if (!status.ok()) {
706            logger.log(EXTENSION_LOG_WARNING,
707                       "RocksDBKVStore::snapshotVBucket: saveVBStateToBatch() "
708                       "failed state:%s vb:%" PRIu16 " :%s",
709                       VBucket::toString(vbstate.state),
710                       vbucketId,
711                       status.getState());
712            return false;
713        }
714        status = rdb->Write(writeOptions, &batch);
715        if (!status.ok()) {
716            logger.log(EXTENSION_LOG_WARNING,
717                       "RocksDBKVStore::snapshotVBucket: Write() "
718                       "failed state:%s vb:%" PRIu16 " :%s",
719                       VBucket::toString(vbstate.state),
720                       vbucketId,
721                       status.getState());
722            return false;
723        }
724    }
725
726    LOG(EXTENSION_LOG_DEBUG,
727        "RocksDBKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16
728        " state:%s",
729        vbucketId,
730        vbstate.toJSON().c_str());
731
732    st.snapshotHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
733            ProcessClock::now() - start));
734
735    return true;
736}
737
738bool RocksDBKVStore::snapshotStats(const std::map<std::string, std::string>&) {
739    // TODO RDB:  Implement
740    return true;
741}
742
743void RocksDBKVStore::destroyInvalidVBuckets(bool) {
744    // TODO RDB:  implement
745}
746
747size_t RocksDBKVStore::getNumShards() {
748    return configuration.getMaxShards();
749}
750
751bool RocksDBKVStore::getStat(const char* name_, size_t& value) {
752    std::string name(name_);
753
754    // Memory Usage
755    if (name == "kMemTableTotal") {
756        return getStatFromMemUsage(rocksdb::MemoryUtil::kMemTableTotal, value);
757    } else if (name == "kMemTableUnFlushed") {
758        return getStatFromMemUsage(rocksdb::MemoryUtil::kMemTableUnFlushed,
759                                   value);
760    } else if (name == "kTableReadersTotal") {
761        return getStatFromMemUsage(rocksdb::MemoryUtil::kTableReadersTotal,
762                                   value);
763    } else if (name == "kCacheTotal") {
764        return getStatFromMemUsage(rocksdb::MemoryUtil::kCacheTotal, value);
765    }
766
767    // MemTable Size per Column Famiy
768    else if (name == "default_kSizeAllMemTables") {
769        return getStatFromProperties(ColumnFamily::Default,
770                                     rocksdb::DB::Properties::kSizeAllMemTables,
771                                     value);
772    } else if (name == "seqno_kSizeAllMemTables") {
773        return getStatFromProperties(ColumnFamily::Seqno,
774                                     rocksdb::DB::Properties::kSizeAllMemTables,
775                                     value);
776    }
777
778    // Block Cache hit/miss
779    else if (name == "rocksdb.block.cache.hit") {
780        return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_HIT, value);
781    } else if (name == "rocksdb.block.cache.miss") {
782        return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_MISS, value);
783    } else if (name == "rocksdb.block.cache.data.hit") {
784        return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_DATA_HIT,
785                                     value);
786    } else if (name == "rocksdb.block.cache.data.miss") {
787        return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_DATA_MISS,
788                                     value);
789    } else if (name == "rocksdb.block.cache.index.hit") {
790        return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_INDEX_HIT,
791                                     value);
792    } else if (name == "rocksdb.block.cache.index.miss") {
793        return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_INDEX_MISS,
794                                     value);
795    } else if (name == "rocksdb.block.cache.filter.hit") {
796        return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_FILTER_HIT,
797                                     value);
798    } else if (name == "rocksdb.block.cache.filter.miss") {
799        return getStatFromStatistics(rocksdb::Tickers::BLOCK_CACHE_FILTER_MISS,
800                                     value);
801    }
802
803    // Disk Usage per Column Family
804    else if (name == "default_kTotalSstFilesSize") {
805        return getStatFromProperties(
806                ColumnFamily::Default,
807                rocksdb::DB::Properties::kTotalSstFilesSize,
808                value);
809    } else if (name == "seqno_kTotalSstFilesSize") {
810        return getStatFromProperties(
811                ColumnFamily::Seqno,
812                rocksdb::DB::Properties::kTotalSstFilesSize,
813                value);
814    }
815
816    // Scan stats
817    else if (name == "scan_totalSeqnoHits") {
818        value = scanTotalSeqnoHits.load();
819        return true;
820    } else if (name == "scan_oldSeqnoHits") {
821        value = scanOldSeqnoHits.load();
822        return true;
823    }
824
825    return false;
826}
827
828StorageProperties RocksDBKVStore::getStorageProperties(void) {
829    StorageProperties rv(StorageProperties::EfficientVBDump::Yes,
830                         StorageProperties::EfficientVBDeletion::Yes,
831                         StorageProperties::PersistedDeletion::No,
832                         // TODO RDB: Not strictly true, multiGet
833                         // does not yet use the underlying multi get
834                         // of RocksDB
835                         StorageProperties::EfficientGet::Yes,
836                         StorageProperties::ConcurrentWriteCompact::Yes);
837    return rv;
838}
839
840std::unordered_set<const rocksdb::Cache*> RocksDBKVStore::getCachePointers() {
841    std::unordered_set<const rocksdb::Cache*> cache_set;
842
843    // TODO: Cache from DBImpl. The 'std::shared_ptr<Cache>
844    // table_cache_' pointer is not exposed through the 'DB' interface
845
846    // Cache from DBOptions
847    // Note: we do not use the 'row_cache' currently.
848    cache_set.insert(rdb->GetDBOptions().row_cache.get());
849
850    // Cache from table factories.
851    addCFBlockCachePointers(defaultCFOptions, cache_set);
852    addCFBlockCachePointers(seqnoCFOptions, cache_set);
853
854    return cache_set;
855}
856
857void RocksDBKVStore::addCFBlockCachePointers(
858        const rocksdb::ColumnFamilyOptions& cfOptions,
859        std::unordered_set<const rocksdb::Cache*>& cache_set) {
860    if (cfOptions.table_factory) {
861        auto* table_options = cfOptions.table_factory->GetOptions();
862        auto* bbt_options =
863                static_cast<rocksdb::BlockBasedTableOptions*>(table_options);
864        cache_set.insert(bbt_options->block_cache.get());
865        cache_set.insert(bbt_options->block_cache_compressed.get());
866    }
867}
868
869rocksdb::StatsLevel RocksDBKVStore::getStatsLevel(
870        const std::string& stats_level) {
871    if (stats_level == "kExceptDetailedTimers") {
872        return rocksdb::StatsLevel::kExceptDetailedTimers;
873    } else if (stats_level == "kExceptTimeForMutex") {
874        return rocksdb::StatsLevel::kExceptTimeForMutex;
875    } else if (stats_level == "kAll") {
876        return rocksdb::StatsLevel::kAll;
877    } else {
878        throw std::invalid_argument(
879                std::string("RocksDBKVStore::getStatsLevel: stats_level: '") +
880                stats_level + std::string("'"));
881    }
882}
883
884rocksdb::Slice RocksDBKVStore::getKeySlice(const DocKey& key) {
885    return rocksdb::Slice(reinterpret_cast<const char*>(key.data()),
886                          key.size());
887}
888
889rocksdb::Slice RocksDBKVStore::getSeqnoSlice(const int64_t* seqno) {
890    return rocksdb::Slice(reinterpret_cast<const char*>(seqno), sizeof(*seqno));
891}
892
893int64_t RocksDBKVStore::getNumericSeqno(const rocksdb::Slice& seqnoSlice) {
894    assert(seqnoSlice.size() == sizeof(int64_t));
895    int64_t seqno;
896    std::memcpy(&seqno, seqnoSlice.data(), seqnoSlice.size());
897    return seqno;
898}
899
900std::unique_ptr<Item> RocksDBKVStore::makeItem(uint16_t vb,
901                                               const DocKey& key,
902                                               const rocksdb::Slice& s,
903                                               GetMetaOnly getMetaOnly) {
904    assert(s.size() >= sizeof(rockskv::MetaData));
905
906    const char* data = s.data();
907
908    rockskv::MetaData meta;
909    std::memcpy(&meta, data, sizeof(meta));
910    data += sizeof(meta);
911
912    bool includeValue = getMetaOnly == GetMetaOnly::No && meta.valueSize;
913
914    auto item = std::make_unique<Item>(key,
915                                       meta.flags,
916                                       meta.exptime,
917                                       includeValue ? data : nullptr,
918                                       includeValue ? meta.valueSize : 0,
919                                       meta.datatype,
920                                       meta.cas,
921                                       meta.bySeqno,
922                                       vb,
923                                       meta.revSeqno);
924
925    if (meta.deleted) {
926        item->setDeleted();
927    }
928
929    return item;
930}
931
932GetValue RocksDBKVStore::makeGetValue(uint16_t vb,
933                                      const DocKey& key,
934                                      const std::string& value,
935                                      GetMetaOnly getMetaOnly) {
936    rocksdb::Slice sval(value);
937    return GetValue(
938            makeItem(vb, key, sval, getMetaOnly), ENGINE_SUCCESS, -1, 0);
939}
940
941void RocksDBKVStore::readVBState(const VBHandle& vbh) {
942    // Largely copied from CouchKVStore
943    // TODO RDB: refactor out sections common to CouchKVStore
944    vbucket_state_t state = vbucket_state_dead;
945    uint64_t checkpointId = 0;
946    uint64_t maxDeletedSeqno = 0;
947    int64_t highSeqno = readHighSeqnoFromDisk(vbh);
948    std::string failovers;
949    uint64_t purgeSeqno = 0;
950    uint64_t lastSnapStart = 0;
951    uint64_t lastSnapEnd = 0;
952    uint64_t maxCas = 0;
953    int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised;
954    bool mightContainXattrs = false;
955
956    auto key = getVbstateKey();
957    std::string vbstate;
958    auto vbid = vbh.vbid;
959    auto status = rdb->Get(rocksdb::ReadOptions(),
960                           vbh.seqnoCFH.get(),
961                           getSeqnoSlice(&key),
962                           &vbstate);
963    if (!status.ok()) {
964        if (status.IsNotFound()) {
965            logger.log(EXTENSION_LOG_NOTICE,
966                       "RocksDBKVStore::readVBState: '_local/vbstate.%" PRIu16
967                       "' not found",
968                       vbid);
969        } else {
970            logger.log(EXTENSION_LOG_WARNING,
971                       "RocksDBKVStore::readVBState: error getting vbstate "
972                       "error:%s, vb:%" PRIu16,
973                       status.getState(),
974                       vbid);
975        }
976    } else {
977        cJSON* jsonObj = cJSON_Parse(vbstate.c_str());
978        if (!jsonObj) {
979            logger.log(EXTENSION_LOG_WARNING,
980                       "RocksKVStore::readVBState: Failed to parse the vbstat "
981                       "json doc for vb:%" PRIu16 ", json:%s",
982                       vbid,
983                       vbstate.c_str());
984        }
985
986        const std::string vb_state =
987                getJSONObjString(cJSON_GetObjectItem(jsonObj, "state"));
988        const std::string checkpoint_id =
989                getJSONObjString(cJSON_GetObjectItem(jsonObj, "checkpoint_id"));
990        const std::string max_deleted_seqno = getJSONObjString(
991                cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
992        const std::string snapStart =
993                getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_start"));
994        const std::string snapEnd =
995                getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_end"));
996        const std::string maxCasValue =
997                getJSONObjString(cJSON_GetObjectItem(jsonObj, "max_cas"));
998        const std::string hlcCasEpoch =
999                getJSONObjString(cJSON_GetObjectItem(jsonObj, "hlc_epoch"));
1000        mightContainXattrs = getJSONObjBool(
1001                cJSON_GetObjectItem(jsonObj, "might_contain_xattrs"));
1002
1003        cJSON* failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
1004        if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0 ||
1005            max_deleted_seqno.compare("") == 0) {
1006            logger.log(EXTENSION_LOG_WARNING,
1007                       "RocksDBKVStore::readVBState: State"
1008                       " JSON doc for vb:%" PRIu16
1009                       " is in the wrong format:%s, "
1010                       "vb state:%s, checkpoint id:%s and max deleted seqno:%s",
1011                       vbid,
1012                       vbstate.c_str(),
1013                       vb_state.c_str(),
1014                       checkpoint_id.c_str(),
1015                       max_deleted_seqno.c_str());
1016        } else {
1017            state = VBucket::fromString(vb_state.c_str());
1018            maxDeletedSeqno = std::stoull(max_deleted_seqno);
1019            checkpointId = std::stoull(checkpoint_id);
1020
1021            if (snapStart.compare("") == 0) {
1022                lastSnapStart = highSeqno;
1023            } else {
1024                lastSnapStart = std::stoull(snapStart.c_str());
1025            }
1026
1027            if (snapEnd.compare("") == 0) {
1028                lastSnapEnd = highSeqno;
1029            } else {
1030                lastSnapEnd = std::stoull(snapEnd.c_str());
1031            }
1032
1033            if (maxCasValue.compare("") != 0) {
1034                maxCas = std::stoull(maxCasValue.c_str());
1035            }
1036
1037            if (!hlcCasEpoch.empty()) {
1038                hlcCasEpochSeqno = std::stoull(hlcCasEpoch);
1039            }
1040
1041            if (failover_json) {
1042                failovers = to_string(failover_json, false);
1043            }
1044        }
1045        cJSON_Delete(jsonObj);
1046    }
1047
1048    cachedVBStates[vbh.vbid] =
1049            std::make_unique<vbucket_state>(state,
1050                                            checkpointId,
1051                                            maxDeletedSeqno,
1052                                            highSeqno,
1053                                            purgeSeqno,
1054                                            lastSnapStart,
1055                                            lastSnapEnd,
1056                                            maxCas,
1057                                            hlcCasEpochSeqno,
1058                                            mightContainXattrs,
1059                                            failovers);
1060}
1061
1062rocksdb::Status RocksDBKVStore::saveVBStateToBatch(const VBHandle& vbh,
1063                                                   const vbucket_state& vbState,
1064                                                   rocksdb::WriteBatch& batch) {
1065    std::stringstream jsonState;
1066
1067    jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
1068              << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
1069              << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno
1070              << "\"";
1071    if (!vbState.failovers.empty()) {
1072        jsonState << ",\"failover_table\": " << vbState.failovers;
1073    }
1074    jsonState << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
1075              << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
1076              << ",\"max_cas\": \"" << vbState.maxCas << "\""
1077              << ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\"";
1078
1079    if (vbState.mightContainXattrs) {
1080        jsonState << ",\"might_contain_xattrs\": true";
1081    } else {
1082        jsonState << ",\"might_contain_xattrs\": false";
1083    }
1084
1085    jsonState << "}";
1086
1087    auto key = getVbstateKey();
1088    rocksdb::Slice keySlice = getSeqnoSlice(&key);
1089    return batch.Put(vbh.seqnoCFH.get(), keySlice, jsonState.str());
1090}
1091
1092rocksdb::ColumnFamilyOptions RocksDBKVStore::getBaselineDefaultCFOptions() {
1093    rocksdb::ColumnFamilyOptions cfOptions;
1094    // Enable Point Lookup Optimization for the 'default' Column Family
1095    // Note: whatever we give in input as 'block_cache_size_mb', the Block
1096    // Cache will be reset with the shared 'blockCache' of size
1097    // 'rocksdb_block_cache_size'
1098    cfOptions.OptimizeForPointLookup(1);
1099    return cfOptions;
1100}
1101
1102rocksdb::ColumnFamilyOptions RocksDBKVStore::getBaselineSeqnoCFOptions() {
1103    rocksdb::ColumnFamilyOptions cfOptions;
1104    cfOptions.comparator = &seqnoComparator;
1105    return cfOptions;
1106}
1107
1108void RocksDBKVStore::applyUserCFOptions(rocksdb::ColumnFamilyOptions& cfOptions,
1109                                        const std::string& newCfOptions,
1110                                        const std::string& newBbtOptions) {
1111    // Apply 'newCfOptions' on top of 'cfOptions'
1112    auto status = rocksdb::GetColumnFamilyOptionsFromString(
1113            cfOptions, newCfOptions, &cfOptions);
1114    if (!status.ok()) {
1115        throw std::invalid_argument(
1116                std::string("RocksDBKVStore::applyUserCFOptions:  "
1117                            "GetColumnFamilyOptionsFromString error: ") +
1118                status.getState());
1119    }
1120
1121    // RocksDB ColumnFamilyOptions provide advanced options for the
1122    // Block Based Table file format, which is the default format for SST files.
1123    // Apply 'newBbtOptions' on top of the current BlockBasedTableOptions of
1124    // 'cfOptions'
1125    rocksdb::BlockBasedTableOptions baseOptions;
1126    if (cfOptions.table_factory) {
1127        auto* bbtOptions = cfOptions.table_factory->GetOptions();
1128        if (bbtOptions) {
1129            baseOptions = *(
1130                    static_cast<rocksdb::BlockBasedTableOptions*>(bbtOptions));
1131        }
1132    }
1133
1134    rocksdb::BlockBasedTableOptions tableOptions;
1135    status = rocksdb::GetBlockBasedTableOptionsFromString(
1136            baseOptions, newBbtOptions, &tableOptions);
1137    if (!status.ok()) {
1138        throw std::invalid_argument(
1139                std::string("RocksDBKVStore::applyUserCFOptions: "
1140                            "GetBlockBasedTableOptionsFromString error: ") +
1141                status.getState());
1142    }
1143
1144    // If using Partitioned Filters, then use the RocksDB recommended params
1145    // (https://github.com/facebook/rocksdb/blob/master/include/rocksdb/filter_policy.h#L133):
1146    //     "bits_per_key: bits per key in bloom filter. A good value for
1147    //           bits_per_key is 10, which yields a filter with ~1% false
1148    //           positive rate.
1149    //       use_block_based_builder: use block based filter rather than full
1150    //           filter. If you want to build a full filter, it needs to be
1151    //           set to false."
1152    if (tableOptions.partition_filters == true) {
1153        tableOptions.filter_policy.reset(
1154                rocksdb::NewBloomFilterPolicy(10, false));
1155    }
1156
1157    // Always use the per-shard shared Block Cache. If it is nullptr, RocksDB
1158    // will allocate a default size Block Cache.
1159    tableOptions.block_cache = blockCache;
1160
1161    // Set the new BlockBasedTableOptions
1162    cfOptions.table_factory.reset(
1163            rocksdb::NewBlockBasedTableFactory(tableOptions));
1164
1165    // Set the user-provided size amplification factor if under Universal
1166    // Compaction
1167    if (cfOptions.compaction_style ==
1168        rocksdb::CompactionStyle::kCompactionStyleUniversal) {
1169        auto& configuration =
1170                dynamic_cast<RocksDBKVStoreConfig&>(this->configuration);
1171        cfOptions.compaction_options_universal.max_size_amplification_percent =
1172                configuration.getUCMaxSizeAmplificationPercent();
1173    }
1174}
1175
1176rocksdb::Status RocksDBKVStore::writeAndTimeBatch(rocksdb::WriteBatch batch) {
1177    auto begin = ProcessClock::now();
1178    auto status = rdb->Write(writeOptions, &batch);
1179    st.commitHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1180            ProcessClock::now() - begin));
1181    return status;
1182}
1183
1184rocksdb::Status RocksDBKVStore::saveDocs(
1185        uint16_t vbid,
1186        const Item* collectionsManifest,
1187        const std::vector<std::unique_ptr<RocksRequest>>& commitBatch) {
1188    auto reqsSize = commitBatch.size();
1189    if (reqsSize == 0) {
1190        st.docsCommitted = 0;
1191        return rocksdb::Status::OK();
1192    }
1193
1194    auto& vbstate = cachedVBStates[vbid];
1195    if (vbstate == nullptr) {
1196        throw std::logic_error("RocksDBKVStore::saveDocs: cachedVBStates[" +
1197                               std::to_string(vbid) + "] is NULL");
1198    }
1199
1200    rocksdb::Status status;
1201    int64_t maxDBSeqno = 0;
1202    rocksdb::WriteBatch batch;
1203
1204    const auto vbh = getVBHandle(vbid);
1205
1206    for (const auto& request : commitBatch) {
1207        int64_t bySeqno = request->getDocMeta().bySeqno;
1208        maxDBSeqno = std::max(maxDBSeqno, bySeqno);
1209
1210        status = addRequestToWriteBatch(*vbh, batch, request.get());
1211        if (!status.ok()) {
1212            logger.log(EXTENSION_LOG_WARNING,
1213                       "RocksDBKVStore::saveDocs: addRequestToWriteBatch "
1214                       "error:%d, vb:%" PRIu16,
1215                       status.code(),
1216                       vbid);
1217            return status;
1218        }
1219
1220        // Check if we should split into a new writeBatch if the batch size
1221        // exceeds the write_buffer_size - this is necessary because we
1222        // don't want our WriteBatch to exceed the configured memtable size, as
1223        // that can cause significant memory bloating (see MB-26521).
1224        // Note the limit check is only approximate, as the batch contains
1225        // updates for at least 2 CFs (key & seqno) which will be written into
1226        // separate memtables, so we don't exactly know the size contribution
1227        // to each memtable in the batch.
1228        const auto batchLimit = defaultCFOptions.write_buffer_size +
1229                                seqnoCFOptions.write_buffer_size;
1230        if (batch.GetDataSize() > batchLimit) {
1231            status = writeAndTimeBatch(batch);
1232            if (!status.ok()) {
1233                logger.log(EXTENSION_LOG_WARNING,
1234                           "RocksDBKVStore::saveDocs: rocksdb::DB::Write "
1235                           "error:%d, "
1236                           "vb:%" PRIu16,
1237                           status.code(),
1238                           vbid);
1239                return status;
1240            }
1241            batch.Clear();
1242        }
1243    }
1244
1245    status = saveVBStateToBatch(*vbh, *vbstate, batch);
1246    if (!status.ok()) {
1247        logger.log(EXTENSION_LOG_WARNING,
1248                   "RocksDBKVStore::saveDocs: saveVBStateToBatch error:%d",
1249                   status.code());
1250        return status;
1251    }
1252
1253    status = writeAndTimeBatch(batch);
1254    if (!status.ok()) {
1255        logger.log(EXTENSION_LOG_WARNING,
1256                   "RocksDBKVStore::saveDocs: rocksdb::DB::Write error:%d, "
1257                   "vb:%" PRIu16,
1258                   status.code(),
1259                   vbid);
1260        return status;
1261    }
1262
1263    st.batchSize.add(reqsSize);
1264    st.docsCommitted = reqsSize;
1265
1266    // Update high seqno
1267    vbstate->highSeqno = maxDBSeqno;
1268
1269    return rocksdb::Status::OK();
1270}
1271
1272rocksdb::Status RocksDBKVStore::addRequestToWriteBatch(
1273        const VBHandle& vbh,
1274        rocksdb::WriteBatch& batch,
1275        RocksRequest* request) {
1276    uint16_t vbid = request->getVBucketId();
1277
1278    rocksdb::Slice keySlice = getKeySlice(request->getKey());
1279    rocksdb::SliceParts keySliceParts(&keySlice, 1);
1280
1281    rocksdb::Slice docSlices[] = {request->getDocMetaSlice(),
1282                                  request->getDocBodySlice()};
1283    rocksdb::SliceParts valueSliceParts(docSlices, 2);
1284
1285    rocksdb::Slice bySeqnoSlice = getSeqnoSlice(&request->getDocMeta().bySeqno);
1286    // We use the `saveDocsHisto` to track the time spent on
1287    // `rocksdb::WriteBatch::Put()`.
1288    auto begin = ProcessClock::now();
1289    auto status =
1290            batch.Put(vbh.defaultCFH.get(), keySliceParts, valueSliceParts);
1291    if (!status.ok()) {
1292        logger.log(EXTENSION_LOG_WARNING,
1293                   "RocksDBKVStore::saveDocs: rocksdb::WriteBatch::Put "
1294                   "[ColumnFamily: \'default\']  error:%d, "
1295                   "vb:%" PRIu16,
1296                   status.code(),
1297                   vbid);
1298        return status;
1299    }
1300    status = batch.Put(vbh.seqnoCFH.get(), bySeqnoSlice, keySlice);
1301    if (!status.ok()) {
1302        logger.log(EXTENSION_LOG_WARNING,
1303                   "RocksDBKVStore::saveDocs: rocksdb::WriteBatch::Put "
1304                   "[ColumnFamily: \'seqno\']  error:%d, "
1305                   "vb:%" PRIu16,
1306                   status.code(),
1307                   vbid);
1308        return status;
1309    }
1310    st.saveDocsHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1311            ProcessClock::now() - begin));
1312
1313    return rocksdb::Status::OK();
1314}
1315
1316int64_t RocksDBKVStore::readHighSeqnoFromDisk(const VBHandle& vbh) {
1317    std::unique_ptr<rocksdb::Iterator> it(
1318            rdb->NewIterator(rocksdb::ReadOptions(), vbh.seqnoCFH.get()));
1319
1320    // Seek to the highest seqno=>key mapping stored for the vbid
1321    auto maxSeqno = std::numeric_limits<int64_t>::max();
1322    rocksdb::Slice maxSeqnoSlice = getSeqnoSlice(&maxSeqno);
1323    it->SeekForPrev(maxSeqnoSlice);
1324
1325    if (!it->Valid()) {
1326        return 0;
1327    }
1328    auto highSeqno = getNumericSeqno(it->key());
1329    // We use a negative seqno as key for VBState. Do not consider it.
1330    return highSeqno >= 0 ? highSeqno : 0;
1331}
1332
1333int64_t RocksDBKVStore::getVbstateKey() {
1334    // We put the VBState into the SeqnoCF. As items in the SeqnoCF are ordered
1335    // by increasing-seqno, we reserve a negative special key to VBState so
1336    // that we can access it in O(1).
1337    return -9999;
1338}
1339
1340ScanContext* RocksDBKVStore::initScanContext(
1341        std::shared_ptr<StatusCallback<GetValue>> cb,
1342        std::shared_ptr<StatusCallback<CacheLookup>> cl,
1343        uint16_t vbid,
1344        uint64_t startSeqno,
1345        DocumentFilter options,
1346        ValueFilter valOptions) {
1347    size_t scanId = scanCounter++;
1348
1349    {
1350        std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1351        scanSnapshots.emplace(scanId, SnapshotPtr(rdb->GetSnapshot(), *rdb));
1352    }
1353
1354    // As we cannot efficiently determine how many documents this scan will
1355    // find, we approximate this value with the seqno difference + 1
1356    // as scan is supposed to be inclusive at both ends,
1357    // seqnos 2 to 4 covers 3 docs not 4 - 2 = 2
1358
1359    uint64_t endSeqno = cachedVBStates[vbid]->highSeqno;
1360    return new ScanContext(cb,
1361                           cl,
1362                           vbid,
1363                           scanId,
1364                           startSeqno,
1365                           endSeqno,
1366                           0, /*TODO RDB: pass the real purge-seqno*/
1367                           options,
1368                           valOptions,
1369                           /* documentCount */ endSeqno - startSeqno + 1,
1370                           configuration);
1371}
1372
1373scan_error_t RocksDBKVStore::scan(ScanContext* ctx) {
1374    if (!ctx) {
1375        return scan_failed;
1376    }
1377
1378    if (ctx->lastReadSeqno == ctx->maxSeqno) {
1379        return scan_success;
1380    }
1381
1382    auto startSeqno = ctx->startSeqno;
1383    if (ctx->lastReadSeqno != 0) {
1384        startSeqno = ctx->lastReadSeqno + 1;
1385    }
1386
1387    TRACE_EVENT2("RocksDBKVStore",
1388                 "scan",
1389                 "vbid",
1390                 ctx->vbid,
1391                 "startSeqno",
1392                 startSeqno);
1393
1394    GetMetaOnly isMetaOnly = ctx->valFilter == ValueFilter::KEYS_ONLY
1395                                     ? GetMetaOnly::Yes
1396                                     : GetMetaOnly::No;
1397
1398    rocksdb::ReadOptions snapshotOpts{rocksdb::ReadOptions()};
1399
1400    // Lock for safe access to the scanSnapshots map and to ensure the snapshot
1401    // doesn't get destroyed whilst we have the pointer.
1402    // @todo use a shared_ptr and reduce the lock scope to just the map::at call
1403    std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1404    snapshotOpts.snapshot = scanSnapshots.at(ctx->scanId).get();
1405
1406    rocksdb::Slice startSeqnoSlice = getSeqnoSlice(&startSeqno);
1407    const auto vbh = getVBHandle(ctx->vbid);
1408    std::unique_ptr<rocksdb::Iterator> it(
1409            rdb->NewIterator(snapshotOpts, vbh->seqnoCFH.get()));
1410    if (!it) {
1411        throw std::logic_error(
1412                "RocksDBKVStore::scan: rocksdb::Iterator to Seqno Column "
1413                "Family is nullptr");
1414    }
1415    it->Seek(startSeqnoSlice);
1416
1417    rocksdb::Slice endSeqnoSlice = getSeqnoSlice(&ctx->maxSeqno);
1418    auto isPastEnd = [&endSeqnoSlice, this](rocksdb::Slice seqSlice) {
1419        return seqnoComparator.Compare(seqSlice, endSeqnoSlice) == 1;
1420    };
1421
1422    for (; it->Valid() && !isPastEnd(it->key()); it->Next()) {
1423        scanTotalSeqnoHits++;
1424        auto seqno = getNumericSeqno(it->key());
1425        rocksdb::Slice keySlice = it->value();
1426        std::string valueStr;
1427        auto s = rdb->Get(
1428                snapshotOpts, vbh->defaultCFH.get(), keySlice, &valueStr);
1429
1430        if (!s.ok()) {
1431            // TODO RDB: Old seqnos are never removed from the db!
1432            // If the item does not exist (s.isNotFound())
1433            // the seqno => key mapping could be removed; not even
1434            // a tombstone remains of that item.
1435
1436            // Note: I account also the hits for deleted documents because it
1437            // is logically correct. But, we switch on the RocksDB built-in
1438            // Bloom Filter by default and we try to keep all the Filter blocks
1439            // in the BlockCache. So, I expect that the impact of old-seqno hits
1440            // is minimum in this case.
1441            scanOldSeqnoHits++;
1442
1443            continue;
1444        }
1445
1446        rocksdb::Slice valSlice(valueStr);
1447
1448        // TODO RDB: Deal with collections
1449        DocKey key(reinterpret_cast<const uint8_t*>(keySlice.data()),
1450                   keySlice.size(),
1451                   DocNamespace::DefaultCollection);
1452
1453        std::unique_ptr<Item> itm =
1454                makeItem(ctx->vbid, key, valSlice, isMetaOnly);
1455
1456        if (itm->getBySeqno() > seqno) {
1457            // TODO RDB: Old seqnos are never removed from the db!
1458            // If the item has a newer seqno now, the stale
1459            // seqno => key mapping could be removed
1460            scanOldSeqnoHits++;
1461            continue;
1462        } else if (itm->getBySeqno() < seqno) {
1463            throw std::logic_error(
1464                    "RocksDBKVStore::scan: index has a higher seqno"
1465                    "than the document in a snapshot!");
1466        }
1467
1468        bool includeDeletes =
1469                (ctx->docFilter == DocumentFilter::NO_DELETES) ? false : true;
1470        bool onlyKeys =
1471                (ctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false;
1472
1473        if (!includeDeletes && itm->isDeleted()) {
1474            continue;
1475        }
1476        int64_t byseqno = itm->getBySeqno();
1477        CacheLookup lookup(key,
1478                           byseqno,
1479                           ctx->vbid,
1480                           ctx->collectionsContext.getSeparator());
1481        ctx->lookup->callback(lookup);
1482
1483        int status = ctx->lookup->getStatus();
1484
1485        if (status == ENGINE_KEY_EEXISTS) {
1486            ctx->lastReadSeqno = byseqno;
1487            continue;
1488        } else if (status == ENGINE_ENOMEM) {
1489            return scan_again;
1490        }
1491
1492        GetValue rv(std::move(itm), ENGINE_SUCCESS, -1, onlyKeys);
1493        ctx->callback->callback(rv);
1494        status = ctx->callback->getStatus();
1495
1496        if (status == ENGINE_ENOMEM) {
1497            return scan_again;
1498        }
1499
1500        ctx->lastReadSeqno = byseqno;
1501    }
1502
1503    cb_assert(it->status().ok()); // Check for any errors found during the scan
1504
1505    return scan_success;
1506}
1507
1508void RocksDBKVStore::destroyScanContext(ScanContext* ctx) {
1509    if (ctx == nullptr) {
1510        return;
1511    }
1512    std::lock_guard<std::mutex> lg(scanSnapshotsMutex);
1513    // TODO RDB: Might be nice to have the snapshot in the ctx and
1514    // release it on destruction
1515    auto it = scanSnapshots.find(ctx->scanId);
1516    if (it != scanSnapshots.end()) {
1517        scanSnapshots.erase(it);
1518    }
1519    delete ctx;
1520}
1521
1522bool RocksDBKVStore::getStatFromMemUsage(
1523        const rocksdb::MemoryUtil::UsageType type, size_t& value) {
1524    std::vector<rocksdb::DB*> dbs = {rdb.get()};
1525    auto cache_set = getCachePointers();
1526    std::map<rocksdb::MemoryUtil::UsageType, uint64_t> usageByType;
1527
1528    auto status = rocksdb::MemoryUtil::GetApproximateMemoryUsageByType(
1529            dbs, cache_set, &usageByType);
1530    if (!status.ok()) {
1531        logger.log(EXTENSION_LOG_NOTICE,
1532                   "RocksDBKVStore::getStatFromMemUsage: "
1533                   "GetApproximateMemoryUsageByType error: %s",
1534                   status.getState());
1535        return false;
1536    }
1537
1538    value = usageByType.at(type);
1539
1540    return true;
1541}
1542
1543bool RocksDBKVStore::getStatFromStatistics(const rocksdb::Tickers ticker,
1544                                           size_t& value) {
1545    const auto statistics = rdb->GetDBOptions().statistics;
1546    if (!statistics) {
1547        return false;
1548    }
1549    value = statistics->getTickerCount(ticker);
1550    return true;
1551}
1552
1553bool RocksDBKVStore::getStatFromProperties(ColumnFamily cf,
1554                                           const std::string& property,
1555                                           size_t& value) {
1556    value = 0;
1557    std::lock_guard<std::mutex> lg(vbhMutex);
1558    for (const auto vbh : vbHandles) {
1559        if (vbh) {
1560            rocksdb::ColumnFamilyHandle* cfh = nullptr;
1561            switch (cf) {
1562            case ColumnFamily::Default:
1563                cfh = vbh->defaultCFH.get();
1564                break;
1565            case ColumnFamily::Seqno:
1566                cfh = vbh->seqnoCFH.get();
1567                break;
1568            }
1569            if (!cfh) {
1570                return false;
1571            }
1572            std::string out;
1573            if (!rdb->GetProperty(cfh, property, &out)) {
1574                return false;
1575            }
1576            value += std::stoull(out);
1577        }
1578    }
1579
1580    return true;
1581}
1582
1583// As we implement a VBucket as a pair of two Column Families (a 'default' CF
1584// and a 'local+seqno' CF), we need to re-set the 'write_buffer_size' for each
1585// CF when the number of VBuckets managed by the current store changes. The
1586// goal is to keep the total allocation for all the Memtables under the
1587// 'rocksdb_memtables_ratio' given in configuration.
1588// Thus, this function performs the following basic steps:
1589//     1) Re-calculate the new sizes of all Memtables;
1590//     2) Apply the new sizes.
1591// We apply the new sizes using the rocksdb::DB::SetOptions() API. The
1592// 'write_buffer_size' is a dynamically changeable option. This call changes
1593// the size of mutable Memtables instantly. If the new size is below the
1594// current allocation for the Memtable, the next key-value pair added will mark
1595// the Memtable as immutable and will trigger a flush.
1596void RocksDBKVStore::applyMemtablesQuota(
1597        const std::lock_guard<std::mutex>& lock) {
1598    const auto vbuckets = getVBucketsCount(lock);
1599
1600    auto& configuration =
1601            dynamic_cast<RocksDBKVStoreConfig&>(this->configuration);
1602
1603    // 1) If configuration.getMemtablesRatio() == 0.0, then
1604    //      we just want to use the baseline write_buffer_size.
1605    // 2) If vbuckets == 0, then there is no Memtable (this happens only
1606    //      when the underlying RocksDB instance has just been created).
1607    // On both cases the following logic does not apply, so the
1608    // write_buffer_size for both the 'default' and the 'seqno' CFs is left
1609    // to the baseline value.
1610    if (configuration.getMemtablesRatio() > 0.0 && vbuckets > 0) {
1611        const auto memtablesQuota = configuration.getBucketQuota() /
1612                                    configuration.getMaxShards() *
1613                                    configuration.getMemtablesRatio();
1614        // TODO: for now I am hard-coding the percentage of Memtables Quota
1615        // that we allocate for the 'deafult' (90%) and 'seqno' (10%) CFs. The
1616        // plan is to expose this percentage as a configuration parameter in a
1617        // follow-up patch.
1618        const auto defaultCFMemtablesQuota = memtablesQuota * 0.9;
1619        const auto seqnoCFMemtablesQuota =
1620                memtablesQuota - defaultCFMemtablesQuota;
1621
1622        // Set the the write_buffer_size for the 'default' CF
1623        defaultCFOptions.write_buffer_size =
1624                defaultCFMemtablesQuota / vbuckets /
1625                defaultCFOptions.max_write_buffer_number;
1626        // Set the write_buffer_size for the 'seqno' CF
1627        seqnoCFOptions.write_buffer_size =
1628                seqnoCFMemtablesQuota / vbuckets /
1629                seqnoCFOptions.max_write_buffer_number;
1630
1631        // Apply the new write_buffer_size
1632        const std::unordered_map<std::string, std::string>
1633                newDefaultCFWriteBufferSize{std::make_pair(
1634                        "write_buffer_size",
1635                        std::to_string(defaultCFOptions.write_buffer_size))};
1636        const std::unordered_map<std::string, std::string>
1637                newSeqnoCFWriteBufferSize{std::make_pair(
1638                        "write_buffer_size",
1639                        std::to_string(seqnoCFOptions.write_buffer_size))};
1640        for (const auto& vbh : vbHandles) {
1641            if (vbh) {
1642                auto status = rdb->SetOptions(vbh->defaultCFH.get(),
1643                                              newDefaultCFWriteBufferSize);
1644                if (!status.ok()) {
1645                    throw std::runtime_error(
1646                            "RocksDBKVStore::applyMemtablesQuota: SetOptions "
1647                            "failed for [vbid: " +
1648                            std::to_string(vbh->vbid) + ", CF: default]: " +
1649                            status.getState());
1650                }
1651                status = rdb->SetOptions(vbh->seqnoCFH.get(),
1652                                         newSeqnoCFWriteBufferSize);
1653                if (!status.ok()) {
1654                    throw std::runtime_error(
1655                            "RocksDBKVStore::applyMemtablesQuota: SetOptions "
1656                            "failed for [vbid: " +
1657                            std::to_string(vbh->vbid) + ", CF: seqno]: " +
1658                            status.getState());
1659                }
1660            }
1661        }
1662    }
1663
1664    // Overwrite Compaction options if Compaction Optimization is enabled
1665    // for the 'default' CF
1666    if (configuration.getDefaultCfOptimizeCompaction() == "level") {
1667        defaultCFOptions.OptimizeLevelStyleCompaction(
1668                defaultCFOptions.write_buffer_size);
1669    } else if (configuration.getDefaultCfOptimizeCompaction() == "universal") {
1670        defaultCFOptions.OptimizeUniversalStyleCompaction(
1671                defaultCFOptions.write_buffer_size);
1672    }
1673    // Overwrite Compaction options if Compaction Optimization is enabled
1674    // for the 'seqno' CF
1675    if (configuration.getSeqnoCfOptimizeCompaction() == "level") {
1676        seqnoCFOptions.OptimizeLevelStyleCompaction(
1677                seqnoCFOptions.write_buffer_size);
1678    } else if (configuration.getSeqnoCfOptimizeCompaction() == "universal") {
1679        seqnoCFOptions.OptimizeUniversalStyleCompaction(
1680                seqnoCFOptions.write_buffer_size);
1681    }
1682}
1683
1684size_t RocksDBKVStore::getVBucketsCount(
1685        const std::lock_guard<std::mutex>&) const {
1686    uint16_t count = 0;
1687    for (const auto& vbh : vbHandles) {
1688        if (vbh) {
1689            count++;
1690        }
1691    }
1692    return count;
1693}
1694