1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#ifdef _MSC_VER
21#define PATH_MAX MAX_PATH
22#endif
23
24#include <fcntl.h>
25#include <stdio.h>
26#include <string.h>
27#include <sys/stat.h>
28#include <sys/types.h>
29
30#include <algorithm>
31#include <cctype>
32#include <cstdlib>
33#include <fstream>
34#include <iostream>
35#include <list>
36#include <map>
37#include <phosphor/phosphor.h>
38#include <platform/cb_malloc.h>
39#include <platform/checked_snprintf.h>
40#include <string>
41#include <utility>
42#include <vector>
43#include <cJSON.h>
44#include <platform/dirutils.h>
45
46#include "common.h"
47#include "couch-kvstore/couch-kvstore.h"
48#include "ep_types.h"
49#include "kvstore_config.h"
50#include "statwriter.h"
51#include "vbucket.h"
52#include "vbucket_bgfetch_item.h"
53
54#include <JSON_checker.h>
55#include <kvstore.h>
56#include <platform/compress.h>
57
58extern "C" {
59    static int recordDbDumpC(Db *db, DocInfo *docinfo, void *ctx)
60    {
61        return CouchKVStore::recordDbDump(db, docinfo, ctx);
62    }
63}
64
65extern "C" {
66    static int getMultiCbC(Db *db, DocInfo *docinfo, void *ctx)
67    {
68        return CouchKVStore::getMultiCb(db, docinfo, ctx);
69    }
70}
71
72struct kvstats_ctx {
73    kvstats_ctx(bool persistDocNamespace)
74        : persistDocNamespace(persistDocNamespace) {
75    }
76    /// A map of key to bool. If true, the key exists in the VB datafile
77    std::unordered_map<StoredDocKey, bool> keyStats;
78    /// Collections: When enabled this means persisted keys have namespaces
79    bool persistDocNamespace;
80};
81
82static std::string getStrError(Db *db) {
83    const size_t max_msg_len = 256;
84    char msg[max_msg_len];
85    couchstore_last_os_error(db, msg, max_msg_len);
86    std::string errorStr(msg);
87    return errorStr;
88}
89
90/**
91 * Determine the datatype for a blob. It is _highly_ unlikely that
92 * this method is being called, as it would have to be for an item
93 * which is read off the disk _before_ we started to write the
94 * datatype to disk (we did that in a 3.x server).
95 *
96 * @param doc The document to check
97 * @return JSON or RAW bytes
98 */
99static protocol_binary_datatype_t determine_datatype(sized_buf doc) {
100    if (checkUTF8JSON(reinterpret_cast<uint8_t*>(doc.buf), doc.size)) {
101        return PROTOCOL_BINARY_DATATYPE_JSON;
102    } else {
103        return PROTOCOL_BINARY_RAW_BYTES;
104    }
105}
106
107static bool endWithCompact(const std::string &filename) {
108    size_t pos = filename.find(".compact");
109    if (pos == std::string::npos ||
110                        (filename.size() - sizeof(".compact")) != pos) {
111        return false;
112    }
113    return true;
114}
115
116static void discoverDbFiles(const std::string &dir,
117                            std::vector<std::string> &v) {
118    auto files = cb::io::findFilesContaining(dir, ".couch");
119    std::vector<std::string>::iterator ii;
120    for (ii = files.begin(); ii != files.end(); ++ii) {
121        if (!endWithCompact(*ii)) {
122            v.push_back(*ii);
123        }
124    }
125}
126
127static int getMutationStatus(couchstore_error_t errCode) {
128    switch (errCode) {
129    case COUCHSTORE_SUCCESS:
130        return MUTATION_SUCCESS;
131    case COUCHSTORE_ERROR_NO_HEADER:
132    case COUCHSTORE_ERROR_NO_SUCH_FILE:
133    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
134        // this return causes ep engine to drop the failed flush
135        // of an item since it does not know about the itme any longer
136        return DOC_NOT_FOUND;
137    default:
138        // this return causes ep engine to keep requeuing the failed
139        // flush of an item
140        return MUTATION_FAILED;
141    }
142}
143
144static bool allDigit(std::string &input) {
145    size_t numchar = input.length();
146    for(size_t i = 0; i < numchar; ++i) {
147        if (!isdigit(input[i])) {
148            return false;
149        }
150    }
151    return true;
152}
153
154static std::string couchkvstore_strerrno(Db *db, couchstore_error_t err) {
155    switch (err) {
156    case COUCHSTORE_ERROR_OPEN_FILE:
157    case COUCHSTORE_ERROR_READ:
158    case COUCHSTORE_ERROR_WRITE:
159    case COUCHSTORE_ERROR_FILE_CLOSE:
160        return getStrError(db);
161
162    case COUCHSTORE_ERROR_CORRUPT:
163    case COUCHSTORE_ERROR_CHECKSUM_FAIL: {
164        char buffer[256];
165        couchstore_last_internal_error(db, buffer, sizeof(buffer));
166        return std::string(buffer);
167    }
168    default:
169        return "none";
170    }
171}
172
173static DocKey makeDocKey(const sized_buf buf, bool restoreNamespace) {
174    if (restoreNamespace) {
175        return DocKey(reinterpret_cast<const uint8_t*>(&buf.buf[1]),
176                      buf.size - 1,
177                      DocNamespace(buf.buf[0]));
178    } else {
179        return DocKey(reinterpret_cast<const uint8_t*>(buf.buf),
180                      buf.size,
181                      DocNamespace::DefaultCollection);
182    }
183}
184
185struct GetMultiCbCtx {
186    GetMultiCbCtx(CouchKVStore &c, uint16_t v, vb_bgfetch_queue_t &f) :
187        cks(c), vbId(v), fetches(f) {}
188
189    CouchKVStore &cks;
190    uint16_t vbId;
191    vb_bgfetch_queue_t &fetches;
192};
193
194struct StatResponseCtx {
195public:
196    StatResponseCtx(std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &sm,
197                    uint16_t vb) : statMap(sm), vbId(vb) {
198        /* EMPTY */
199    }
200
201    std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &statMap;
202    uint16_t vbId;
203};
204
205struct AllKeysCtx {
206    AllKeysCtx(std::shared_ptr<Callback<const DocKey&>> callback, uint32_t cnt)
207        : cb(callback), count(cnt) { }
208
209    std::shared_ptr<Callback<const DocKey&>> cb;
210    uint32_t count;
211};
212
213couchstore_content_meta_flags CouchRequest::getContentMeta(const Item& it) {
214    couchstore_content_meta_flags rval;
215
216    if (mcbp::datatype::is_json(it.getDataType())) {
217        rval = COUCH_DOC_IS_JSON;
218    } else {
219        rval = COUCH_DOC_NON_JSON_MODE;
220    }
221
222    if (it.getNBytes() > 0 && !mcbp::datatype::is_snappy(it.getDataType())) {
223        //Compress only if a value exists and is not already compressed
224        rval |= COUCH_DOC_IS_COMPRESSED;
225    }
226
227    return rval;
228}
229
230CouchRequest::CouchRequest(const Item& it,
231                           uint64_t rev,
232                           MutationRequestCallback& cb,
233                           bool del,
234                           bool persistDocNamespace)
235    : IORequest(it.getVBucketId(), cb, del, it.getKey()),
236      value(it.getValue()),
237      fileRevNum(rev) {
238    // Collections: TODO: Temporary switch to ensure upgrades don't break.
239    if (persistDocNamespace) {
240        dbDoc.id = {const_cast<char*>(reinterpret_cast<const char*>(
241                            key.getDocNameSpacedData())),
242                    it.getKey().getDocNameSpacedSize()};
243    } else {
244        dbDoc.id = {const_cast<char*>(key.c_str()), it.getKey().size()};
245    }
246
247    if (it.getNBytes()) {
248        dbDoc.data.buf = const_cast<char *>(value->getData());
249        dbDoc.data.size = it.getNBytes();
250    } else {
251        dbDoc.data.buf = NULL;
252        dbDoc.data.size = 0;
253    }
254    meta.setCas(it.getCas());
255    meta.setFlags(it.getFlags());
256    meta.setExptime(it.getExptime());
257    meta.setDataType(it.getDataType());
258
259    dbDocInfo.db_seq = it.getBySeqno();
260
261    // Now allocate space to hold the meta and get it ready for storage
262    dbDocInfo.rev_meta.size = MetaData::getMetaDataSize(MetaData::Version::V1);
263    dbDocInfo.rev_meta.buf = meta.prepareAndGetForPersistence();
264
265    dbDocInfo.rev_seq = it.getRevSeqno();
266    dbDocInfo.size = dbDoc.data.size;
267
268    if (del) {
269        dbDocInfo.deleted =  1;
270    } else {
271        dbDocInfo.deleted = 0;
272    }
273    dbDocInfo.id = dbDoc.id;
274    dbDocInfo.content_meta = getContentMeta(it);
275}
276
277CouchKVStore::CouchKVStore(KVStoreConfig& config)
278    : CouchKVStore(config, *couchstore_get_default_file_ops()) {
279}
280
281CouchKVStore::CouchKVStore(KVStoreConfig& config,
282                           FileOpsInterface& ops,
283                           bool readOnly,
284                           std::shared_ptr<RevisionMap> dbFileRevMap)
285    : KVStore(config, readOnly),
286      dbname(config.getDBName()),
287      dbFileRevMap(dbFileRevMap),
288      intransaction(false),
289      scanCounter(0),
290      logger(config.getLogger()),
291      base_ops(ops) {
292    createDataDir(dbname);
293    statCollectingFileOps = getCouchstoreStatsOps(st.fsStats, base_ops);
294    statCollectingFileOpsCompaction = getCouchstoreStatsOps(
295        st.fsStatsCompaction, base_ops);
296
297    // init db file map with default revision number, 1
298    numDbFiles = configuration.getMaxVBuckets();
299
300    // pre-allocate lookup maps (vectors) given we have a relatively
301    // small, fixed number of vBuckets.
302    cachedDocCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(0));
303    cachedDeleteCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(-1));
304    cachedFileSize.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(0));
305    cachedSpaceUsed.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(0));
306    cachedVBStates.resize(numDbFiles);
307
308    initialize();
309}
310
311CouchKVStore::CouchKVStore(KVStoreConfig& config, FileOpsInterface& ops)
312    : CouchKVStore(config,
313                   ops,
314                   false /*readonly*/,
315                   std::make_shared<RevisionMap>(config.getMaxVBuckets())) {
316}
317
318/**
319 * Make a read-only CouchKVStore from this object
320 */
321std::unique_ptr<CouchKVStore> CouchKVStore::makeReadOnlyStore() {
322    // Not using make_unique due to the private constructor we're calling
323    return std::unique_ptr<CouchKVStore>(
324            new CouchKVStore(configuration, dbFileRevMap));
325}
326
327CouchKVStore::CouchKVStore(KVStoreConfig& config,
328                           std::shared_ptr<RevisionMap> dbFileRevMap)
329    : CouchKVStore(config,
330                   *couchstore_get_default_file_ops(),
331                   true /*readonly*/,
332                   dbFileRevMap) {
333}
334
335void CouchKVStore::initialize() {
336    std::vector<uint16_t> vbids;
337    std::vector<std::string> files;
338    discoverDbFiles(dbname, files);
339    populateFileNameMap(files, &vbids);
340
341    couchstore_error_t errorCode;
342
343    std::vector<uint16_t>::iterator itr = vbids.begin();
344    for (; itr != vbids.end(); ++itr) {
345        uint16_t id = *itr;
346        DbHolder db(*this);
347        errorCode = openDB(id, db, COUCHSTORE_OPEN_FLAG_RDONLY);
348        if (errorCode == COUCHSTORE_SUCCESS) {
349            if (readVBState(db, id) == ReadVBStateStatus::Success) {
350                /* update stat */
351                ++st.numLoadedVb;
352            }
353        } else {
354            logger.log(EXTENSION_LOG_WARNING,
355                       "CouchKVStore::initialize: openDB"
356                       " error:%s, name:%s/%" PRIu16 ".couch.%" PRIu64,
357                       couchstore_strerror(errorCode),
358                       dbname.c_str(),
359                       id,
360                       db.getFileRev());
361            cachedVBStates[id] = NULL;
362        }
363
364        if (!isReadOnly()) {
365            removeCompactFile(dbname, id);
366        }
367    }
368}
369
370CouchKVStore::~CouchKVStore() {
371    close();
372}
373
374void CouchKVStore::reset(uint16_t vbucketId) {
375    if (isReadOnly()) {
376        throw std::logic_error("CouchKVStore::reset: Not valid on a read-only "
377                        "object.");
378    }
379
380    vbucket_state* state = getVBucketState(vbucketId);
381    if (state) {
382        state->reset();
383
384        cachedDocCount[vbucketId] = 0;
385        cachedDeleteCount[vbucketId] = 0;
386        cachedFileSize[vbucketId] = 0;
387        cachedSpaceUsed[vbucketId] = 0;
388
389        // Unlink the current revision and then increment it to ensure any
390        // pending delete doesn't delete us. Note that the expectation is that
391        // some higher level per VB lock is required to prevent data-races here.
392        // KVBucket::vb_mutexes is used in this case.
393        unlinkCouchFile(vbucketId, (*dbFileRevMap)[vbucketId]);
394        incrementRevision(vbucketId);
395
396        setVBucketState(
397                vbucketId, *state, VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT);
398    } else {
399        throw std::invalid_argument("CouchKVStore::reset: No entry in cached "
400                        "states for vbucket " + std::to_string(vbucketId));
401    }
402}
403
404void CouchKVStore::set(const Item& itm,
405                       Callback<TransactionContext, mutation_result>& cb) {
406    if (isReadOnly()) {
407        throw std::logic_error("CouchKVStore::set: Not valid on a read-only "
408                        "object.");
409    }
410    if (!intransaction) {
411        throw std::invalid_argument("CouchKVStore::set: intransaction must be "
412                        "true to perform a set operation.");
413    }
414
415    bool deleteItem = false;
416    MutationRequestCallback requestcb;
417    uint64_t fileRev = (*dbFileRevMap)[itm.getVBucketId()];
418
419    // each req will be de-allocated after commit
420    requestcb.setCb = &cb;
421    CouchRequest* req =
422            new CouchRequest(itm,
423                             fileRev,
424                             requestcb,
425                             deleteItem,
426                             configuration.shouldPersistDocNamespace());
427    pendingReqsQ.push_back(req);
428}
429
430GetValue CouchKVStore::get(const StoredDocKey& key,
431                           uint16_t vb,
432                           bool fetchDelete) {
433    DbHolder db(*this);
434    couchstore_error_t errCode = openDB(vb, db, COUCHSTORE_OPEN_FLAG_RDONLY);
435    if (errCode != COUCHSTORE_SUCCESS) {
436        ++st.numGetFailure;
437        logger.log(EXTENSION_LOG_WARNING,
438                   "CouchKVStore::get: openDB error:%s, vb:%" PRIu16,
439                   couchstore_strerror(errCode), vb);
440        return GetValue(nullptr, couchErr2EngineErr(errCode));
441    }
442
443    GetValue gv = getWithHeader(db, key, vb, GetMetaOnly::No, fetchDelete);
444    return gv;
445}
446
447GetValue CouchKVStore::getWithHeader(void* dbHandle,
448                                     const StoredDocKey& key,
449                                     uint16_t vb,
450                                     GetMetaOnly getMetaOnly,
451                                     bool fetchDelete) {
452    Db *db = (Db *)dbHandle;
453    auto start = ProcessClock::now();
454    DocInfo *docInfo = NULL;
455    sized_buf id;
456    GetValue rv;
457
458    if (configuration.shouldPersistDocNamespace()) {
459        id = {const_cast<char*>(reinterpret_cast<const char*>(
460                      key.getDocNameSpacedData())),
461              key.getDocNameSpacedSize()};
462
463    } else {
464        id = {const_cast<char*>(reinterpret_cast<const char*>(key.data())),
465              key.size()};
466    }
467
468    couchstore_error_t errCode = couchstore_docinfo_by_id(db, (uint8_t *)id.buf,
469                                                          id.size, &docInfo);
470    if (errCode != COUCHSTORE_SUCCESS) {
471        if (getMetaOnly == GetMetaOnly::No) {
472            // log error only if this is non-xdcr case
473            logger.log(EXTENSION_LOG_WARNING,
474                       "CouchKVStore::getWithHeader: couchstore_docinfo_by_id "
475                       "error:%s [%s], vb:%" PRIu16,
476                       couchstore_strerror(errCode),
477                       couchkvstore_strerrno(db, errCode).c_str(), vb);
478        }
479    } else {
480        if (docInfo == nullptr) {
481            throw std::logic_error("CouchKVStore::getWithHeader: "
482                    "couchstore_docinfo_by_id returned success but docInfo "
483                    "is NULL");
484        }
485        errCode = fetchDoc(db, docInfo, rv, vb, getMetaOnly);
486        if (errCode != COUCHSTORE_SUCCESS) {
487            logger.log(EXTENSION_LOG_WARNING,
488                       "CouchKVStore::getWithHeader: fetchDoc error:%s [%s],"
489                       " vb:%" PRIu16 ", deleted:%s",
490                       couchstore_strerror(errCode),
491                       couchkvstore_strerrno(db, errCode).c_str(), vb,
492                       docInfo->deleted ? "yes" : "no");
493        }
494
495        // record stats
496        st.readTimeHisto.add(
497                std::chrono::duration_cast<std::chrono::microseconds>(
498                        ProcessClock::now() - start));
499        if (errCode == COUCHSTORE_SUCCESS) {
500            st.readSizeHisto.add(key.size() + rv.item->getNBytes());
501        }
502    }
503
504    if(errCode != COUCHSTORE_SUCCESS) {
505        ++st.numGetFailure;
506    }
507
508    couchstore_free_docinfo(docInfo);
509    rv.setStatus(couchErr2EngineErr(errCode));
510    return rv;
511}
512
513void CouchKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t &itms) {
514    if (itms.empty()) {
515        return;
516    }
517    int numItems = itms.size();
518
519    DbHolder db(*this);
520    couchstore_error_t errCode = openDB(vb, db, COUCHSTORE_OPEN_FLAG_RDONLY);
521    if (errCode != COUCHSTORE_SUCCESS) {
522        logger.log(EXTENSION_LOG_WARNING,
523                   "CouchKVStore::getMulti: openDB error:%s, "
524                   "vb:%" PRIu16 ", numDocs:%d",
525                   couchstore_strerror(errCode), vb, numItems);
526        st.numGetFailure += numItems;
527        for (auto& item : itms) {
528            item.second.value.setStatus(ENGINE_NOT_MY_VBUCKET);
529        }
530        return;
531    }
532
533    size_t idx = 0;
534    std::vector<sized_buf> ids(itms.size());
535    for (auto& item : itms) {
536        if (configuration.shouldPersistDocNamespace()) {
537            ids[idx] = {const_cast<char*>(reinterpret_cast<const char*>(
538                                item.first.getDocNameSpacedData())),
539                        item.first.getDocNameSpacedSize()};
540        } else {
541            ids[idx] = {const_cast<char*>(reinterpret_cast<const char*>(
542                                item.first.data())),
543                        item.first.size()};
544        }
545
546        ++idx;
547    }
548
549    GetMultiCbCtx ctx(*this, vb, itms);
550
551    errCode = couchstore_docinfos_by_id(
552            db, ids.data(), itms.size(), 0, getMultiCbC, &ctx);
553    if (errCode != COUCHSTORE_SUCCESS) {
554        st.numGetFailure += numItems;
555        logger.log(EXTENSION_LOG_WARNING,
556                   "CouchKVStore::getMulti: "
557                   "couchstore_docinfos_by_id error %s [%s], vb:%" PRIu16,
558                   couchstore_strerror(errCode),
559                   couchkvstore_strerrno(db, errCode).c_str(),
560                   vb);
561        for (auto& item : itms) {
562            item.second.value.setStatus(couchErr2EngineErr(errCode));
563        }
564    }
565
566    // If available, record how many reads() we did for this getMulti;
567    // and the average reads per document.
568    auto* stats = couchstore_get_db_filestats(db);
569    if (stats != nullptr) {
570        const auto readCount = stats->getReadCount();
571        st.getMultiFsReadCount += readCount;
572        st.getMultiFsReadHisto.add(readCount);
573        st.getMultiFsReadPerDocHisto.add(readCount / itms.size());
574    }
575}
576
577void CouchKVStore::del(const Item& itm, Callback<TransactionContext, int>& cb) {
578    if (isReadOnly()) {
579        throw std::logic_error("CouchKVStore::del: Not valid on a read-only "
580                        "object.");
581    }
582    if (!intransaction) {
583        throw std::invalid_argument("CouchKVStore::del: intransaction must be "
584                        "true to perform a delete operation.");
585    }
586
587    uint64_t fileRev = (*dbFileRevMap)[itm.getVBucketId()];
588    MutationRequestCallback requestcb;
589    requestcb.delCb = &cb;
590    CouchRequest* req =
591            new CouchRequest(itm,
592                             fileRev,
593                             requestcb,
594                             true,
595                             configuration.shouldPersistDocNamespace());
596    pendingReqsQ.push_back(req);
597}
598
599void CouchKVStore::delVBucket(uint16_t vbucket, uint64_t fileRev) {
600    if (isReadOnly()) {
601        throw std::logic_error("CouchKVStore::delVBucket: Not valid on a "
602                        "read-only object.");
603    }
604
605    unlinkCouchFile(vbucket, fileRev);
606}
607
608std::vector<vbucket_state *> CouchKVStore::listPersistedVbuckets() {
609    std::vector<vbucket_state*> result;
610    for (const auto& vb : cachedVBStates) {
611        result.emplace_back(vb.get());
612    }
613    return result;
614}
615
616void CouchKVStore::getPersistedStats(std::map<std::string,
617                                     std::string> &stats) {
618    std::vector<char> buffer;
619    std::string fname = dbname + "/stats.json";
620    if (access(fname.c_str(), R_OK) == -1) {
621        return ;
622    }
623
624    std::ifstream session_stats;
625    session_stats.exceptions (session_stats.failbit | session_stats.badbit);
626    try {
627        session_stats.open(fname.c_str(), std::ios::binary);
628        session_stats.seekg(0, std::ios::end);
629        int flen = session_stats.tellg();
630        if (flen < 0) {
631            logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::getPersistedStats:"
632                       " Error in session stats ifstream!!!");
633            session_stats.close();
634            return;
635        }
636        session_stats.seekg(0, std::ios::beg);
637        buffer.resize(flen + 1);
638        session_stats.read(buffer.data(), flen);
639        session_stats.close();
640        buffer[flen] = '\0';
641
642        cJSON *json_obj = cJSON_Parse(buffer.data());
643        if (!json_obj) {
644            logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::getPersistedStats:"
645                       " Failed to parse the session stats json doc!!!");
646            return;
647        }
648
649        int json_arr_size = cJSON_GetArraySize(json_obj);
650        for (int i = 0; i < json_arr_size; ++i) {
651            cJSON *obj = cJSON_GetArrayItem(json_obj, i);
652            if (obj) {
653                stats[obj->string] = obj->valuestring ? obj->valuestring : "";
654            }
655        }
656        cJSON_Delete(json_obj);
657
658    } catch (const std::ifstream::failure &e) {
659        logger.log(EXTENSION_LOG_WARNING,
660                   "CouchKVStore::getPersistedStats: Failed to load the engine "
661                   "session stats due to IO exception \"%s\"", e.what());
662    } catch (...) {
663        logger.log(EXTENSION_LOG_WARNING,
664                   "CouchKVStore::getPersistedStats: Failed to load the engine "
665                   "session stats due to IO exception");
666    }
667}
668
669static std::string getDBFileName(const std::string &dbname,
670                                 uint16_t vbid,
671                                 uint64_t rev) {
672    return dbname + "/" + std::to_string(vbid) + ".couch." +
673           std::to_string(rev);
674}
675
676static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
677    // Examine the metadata of the doc
678    auto documentMetaData = MetaDataFactory::createMetaData((*info)->rev_meta);
679    // Allocate latest metadata
680    std::unique_ptr<MetaData> metadata;
681    if (documentMetaData->getVersionInitialisedFrom() == MetaData::Version::V0) {
682        // Metadata doesn't have flex_meta_code/datatype. Provision space for
683        // these paramenters.
684
685        // If the document is compressed we need to inflate it to
686        // determine if it is json or not.
687        cb::compression::Buffer inflated;
688        cb::const_char_buffer data {item->buf, item->size};
689        if (((*info)->content_meta | COUCH_DOC_IS_COMPRESSED) ==
690                (*info)->content_meta) {
691            if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
692                                          data, inflated)) {
693                throw std::runtime_error(
694                    "edit_docinfo_hook: failed to inflate document with seqno: " +
695                    std::to_string((*info)->db_seq) + " revno: " +
696                    std::to_string((*info)->rev_seq));
697            }
698            data = inflated;
699        }
700
701        protocol_binary_datatype_t datatype = PROTOCOL_BINARY_RAW_BYTES;
702        if (checkUTF8JSON(reinterpret_cast<const uint8_t*>(data.data()),
703                          data.size())) {
704            datatype = PROTOCOL_BINARY_DATATYPE_JSON;
705        }
706
707        // Now create a blank latest metadata.
708        metadata = MetaDataFactory::createMetaData();
709        // Copy the metadata this will pull across available V0 fields.
710        *metadata = *documentMetaData;
711
712        // Setup flex code and datatype
713        metadata->setFlexCode();
714        metadata->setDataType(datatype);
715    } else {
716        // The metadata in the document is V1 and needs no changes.
717        return 0;
718    }
719
720    // the docInfo pointer includes the DocInfo and the data it points to.
721    // this must be a pointer which cb_free() can deallocate
722    char* buffer = static_cast<char*>(cb_calloc(1, sizeof(DocInfo) +
723                             (*info)->id.size +
724                             MetaData::getMetaDataSize(MetaData::Version::V1)));
725
726
727    DocInfo* docInfo = reinterpret_cast<DocInfo*>(buffer);
728
729    // Deep-copy the incoming DocInfo, then we'll fix the pointers/buffer data
730    *docInfo = **info;
731
732    // Correct the id buffer
733    docInfo->id.buf = buffer + sizeof(DocInfo);
734    std::memcpy(docInfo->id.buf, (*info)->id.buf, docInfo->id.size);
735
736    // Correct the rev_meta pointer and fill it in.
737    docInfo->rev_meta.size = MetaData::getMetaDataSize(MetaData::Version::V1);
738    docInfo->rev_meta.buf = buffer + sizeof(DocInfo) + docInfo->id.size;
739    metadata->copyToBuf(docInfo->rev_meta);
740
741    // Free the orginal
742    couchstore_free_docinfo(*info);
743
744    // Return the newly allocated docinfo with corrected metadata
745    *info = docInfo;
746
747    return 1;
748}
749
750/**
751 * Notify the expiry callback that a document has expired
752 *
753 * @param info     document information for the expired item
754 * @param metadata metadata of the document
755 * @param item     buffer containing data and size
756 * @param ctx      context for compaction
757 * @param currtime current time
758 */
759static int notify_expired_item(DocInfo& info,
760                               MetaData& metadata,
761                               sized_buf item,
762                               compaction_ctx& ctx,
763                               time_t currtime) {
764    cb::char_buffer data;
765    cb::compression::Buffer inflated;
766
767    if (mcbp::datatype::is_xattr(metadata.getDataType())) {
768        if (item.buf == nullptr) {
769            // We need to pass on the entire document to the callback
770            return COUCHSTORE_COMPACT_NEED_BODY;
771        }
772
773        // A document on disk is marked snappy in two ways.
774        // 1) info.content_meta if the document was compressed by couchstore
775        // 2) datatype snappy if the document was already compressed when stored
776        if ((info.content_meta & COUCH_DOC_IS_COMPRESSED) ||
777            mcbp::datatype::is_snappy(metadata.getDataType())) {
778            using namespace cb::compression;
779
780            if (!inflate(Algorithm::Snappy, {item.buf, item.size}, inflated)) {
781                LOG(EXTENSION_LOG_WARNING,
782                    "time_purge_hook: failed to inflate document with seqno %" PRIu64 ""
783                    "revno: %" PRIu64, info.db_seq, info.rev_seq);
784                return COUCHSTORE_ERROR_CORRUPT;
785            }
786            // Now remove snappy bit
787            metadata.setDataType(metadata.getDataType() &
788                                 ~PROTOCOL_BINARY_DATATYPE_SNAPPY);
789            data = inflated;
790        }
791    }
792
793    // Collections: TODO: Restore to stored namespace
794    Item it(makeDocKey(info.id, ctx.config->shouldPersistDocNamespace()),
795            metadata.getFlags(),
796            metadata.getExptime(),
797            data.buf,
798            data.len,
799            metadata.getDataType(),
800            metadata.getCas(),
801            info.db_seq,
802            ctx.db_file_id,
803            info.rev_seq);
804
805    it.setRevSeqno(info.rev_seq);
806    ctx.expiryCallback->callback(it, currtime);
807
808    return COUCHSTORE_SUCCESS;
809}
810
811static int time_purge_hook(Db* d, DocInfo* info, sized_buf item, void* ctx_p) {
812    compaction_ctx* ctx = static_cast<compaction_ctx*>(ctx_p);
813    const uint16_t vbid = ctx->db_file_id;
814
815    if (info == nullptr) {
816        // Compaction finished
817        return couchstore_set_purge_seq(d, ctx->max_purged_seq[vbid]);
818    }
819
820    DbInfo infoDb;
821    auto err = couchstore_db_info(d, &infoDb);
822    if (err != COUCHSTORE_SUCCESS) {
823        LOG(EXTENSION_LOG_WARNING,
824            "time_purge_hook: couchstore_db_info() failed: %s",
825            couchstore_strerror(err));
826        return err;
827    }
828
829    uint64_t max_purge_seq = 0;
830    auto it = ctx->max_purged_seq.find(vbid);
831
832    if (it == ctx->max_purged_seq.end()) {
833        ctx->max_purged_seq[vbid] = 0;
834    } else {
835        max_purge_seq = it->second;
836    }
837
838    if (info->rev_meta.size >= MetaData::getMetaDataSize(MetaData::Version::V0)) {
839        auto metadata = MetaDataFactory::createMetaData(info->rev_meta);
840        uint32_t exptime = metadata->getExptime();
841
842        // Is the collections eraser installed?
843        if (ctx->collectionsEraser &&
844            ctx->collectionsEraser(
845                    makeDocKey(info->id,
846                               ctx->config->shouldPersistDocNamespace()),
847                    int64_t(info->db_seq),
848                    info->deleted,
849                    ctx->eraserContext)) {
850            ctx->stats.collectionsItemsPurged++;
851            return COUCHSTORE_COMPACT_DROP_ITEM;
852        }
853
854        if (info->deleted) {
855            if (info->db_seq != infoDb.last_sequence) {
856                if (ctx->drop_deletes) { // all deleted items must be dropped ...
857                    if (max_purge_seq < info->db_seq) {
858                        ctx->max_purged_seq[vbid] = info->db_seq; // track max_purged_seq
859                    }
860                    ctx->stats.tombstonesPurged++;
861                    return COUCHSTORE_COMPACT_DROP_ITEM;      // ...unconditionally
862                }
863                if (exptime < ctx->purge_before_ts &&
864                        (!ctx->purge_before_seq ||
865                         info->db_seq <= ctx->purge_before_seq)) {
866                    if (max_purge_seq < info->db_seq) {
867                        ctx->max_purged_seq[vbid] = info->db_seq;
868                    }
869                    ctx->stats.tombstonesPurged++;
870                    return COUCHSTORE_COMPACT_DROP_ITEM;
871                }
872            }
873        } else {
874            time_t currtime = ep_real_time();
875            if (exptime && exptime < currtime) {
876                int ret;
877                try {
878                    ret = notify_expired_item(*info, *metadata, item,
879                                             *ctx, currtime);
880                } catch (const std::bad_alloc&) {
881                    LOG(EXTENSION_LOG_WARNING,
882                        "time_purge_hook: memory allocation failed");
883                    return COUCHSTORE_ERROR_ALLOC_FAIL;
884                }
885
886                if (ret != COUCHSTORE_SUCCESS) {
887                    return ret;
888                }
889            }
890        }
891    }
892
893    if (ctx->bloomFilterCallback) {
894        bool deleted = info->deleted;
895        // Collections: TODO: Permanently restore to stored namespace
896        DocKey key = makeDocKey(
897                info->id, ctx->config->shouldPersistDocNamespace());
898
899        try {
900            ctx->bloomFilterCallback->callback(
901                    ctx->db_file_id, key, deleted);
902        } catch (std::runtime_error& re) {
903            LOG(EXTENSION_LOG_WARNING,
904                "time_purge_hook: exception occurred when invoking the "
905                "bloomfilter callback on vbucket:%" PRIu16
906                " - Details: %s", vbid, re.what());
907        }
908    }
909
910    return COUCHSTORE_COMPACT_KEEP_ITEM;
911}
912
913bool CouchKVStore::compactDB(compaction_ctx *hook_ctx) {
914    bool result = false;
915
916    try {
917        result = compactDBInternal(hook_ctx, edit_docinfo_hook);
918    } catch(std::logic_error& le) {
919        LOG(EXTENSION_LOG_WARNING,
920            "CouchKVStore::compactDB: exception while performing "
921            "compaction for vbucket:%" PRIu16
922            " - Details: %s", hook_ctx->db_file_id, le.what());
923    }
924    if (!result) {
925        ++st.numCompactionFailure;
926    }
927    return result;
928}
929
930FileInfo CouchKVStore::toFileInfo(const DbInfo& info) {
931    return FileInfo{
932            info.doc_count, info.deleted_count, info.file_size, info.purge_seq};
933}
934
935bool CouchKVStore::compactDBInternal(compaction_ctx* hook_ctx,
936                                     couchstore_docinfo_hook docinfo_hook) {
937    if (isReadOnly()) {
938        throw std::logic_error("CouchKVStore::compactDB: Cannot perform "
939                        "on a read-only instance.");
940    }
941    couchstore_compact_hook       hook = time_purge_hook;
942    couchstore_docinfo_hook dhook = docinfo_hook;
943    FileOpsInterface         *def_iops = statCollectingFileOpsCompaction.get();
944    DbHolder compactdb(*this);
945    DbHolder targetDb(*this);
946    couchstore_error_t         errCode = COUCHSTORE_SUCCESS;
947    ProcessClock::time_point     start = ProcessClock::now();
948    std::string                 dbfile;
949    std::string           compact_file;
950    std::string               new_file;
951    DbInfo                        info;
952    uint16_t                      vbid = hook_ctx->db_file_id;
953    hook_ctx->config = &configuration;
954
955    TRACE_EVENT1("CouchKVStore", "compactDB", "vbid", vbid);
956
957    // Open the source VBucket database file ...
958    errCode = openDB(
959            vbid, compactdb, (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, def_iops);
960    if (errCode != COUCHSTORE_SUCCESS) {
961        logger.log(EXTENSION_LOG_WARNING,
962                   "CouchKVStore::compactDB openDB error:%s, vb:%" PRIu16
963                   ", fileRev:%" PRIu64,
964                   couchstore_strerror(errCode),
965                   vbid,
966                   compactdb.getFileRev());
967        return false;
968    }
969
970    uint64_t new_rev = compactdb.getFileRev() + 1;
971
972    // Build the temporary vbucket.compact file name
973    dbfile = getDBFileName(dbname, vbid, compactdb.getFileRev());
974    compact_file = dbfile + ".compact";
975
976    couchstore_open_flags flags(COUCHSTORE_COMPACT_FLAG_UPGRADE_DB);
977
978    couchstore_db_info(compactdb, &info);
979    hook_ctx->stats.pre = toFileInfo(info);
980
981    /**
982     * This flag disables IO buffering in couchstore which means
983     * file operations will trigger syscalls immediately. This has
984     * a detrimental impact on performance and is only intended
985     * for testing.
986     */
987    if(!configuration.getBuffered()) {
988        flags |= COUCHSTORE_OPEN_FLAG_UNBUFFERED;
989    }
990
991    // Should automatic fsync() be configured for compaction?
992    const auto periodicSyncBytes = configuration.getPeriodicSyncBytes();
993    if (periodicSyncBytes != 0) {
994        flags |= couchstore_encode_periodic_sync_flags(periodicSyncBytes);
995    }
996
997    // Perform COMPACTION of vbucket.couch.rev into vbucket.couch.rev.compact
998    errCode = couchstore_compact_db_ex(compactdb,
999                                       compact_file.c_str(),
1000                                       flags,
1001                                       hook,
1002                                       dhook,
1003                                       hook_ctx,
1004                                       def_iops);
1005    if (errCode != COUCHSTORE_SUCCESS) {
1006        logger.log(EXTENSION_LOG_WARNING,
1007                   "CouchKVStore::compactDB:couchstore_compact_db_ex "
1008                   "error:%s [%s], name:%s",
1009                   couchstore_strerror(errCode),
1010                   couchkvstore_strerrno(compactdb, errCode).c_str(),
1011                   dbfile.c_str());
1012        return false;
1013    }
1014
1015    // Close the source Database File once compaction is done
1016    compactdb.close();
1017
1018    // Rename the .compact file to one with the next revision number
1019    new_file = getDBFileName(dbname, vbid, new_rev);
1020    if (rename(compact_file.c_str(), new_file.c_str()) != 0) {
1021        logger.log(EXTENSION_LOG_WARNING,
1022                   "CouchKVStore::compactDB: rename error:%s, old:%s, new:%s",
1023                   cb_strerror().c_str(), compact_file.c_str(), new_file.c_str());
1024
1025        removeCompactFile(compact_file);
1026        return false;
1027    }
1028
1029    // Open the newly compacted VBucket database file ...
1030    errCode = openSpecificDB(
1031            vbid, new_rev, targetDb, (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY);
1032    if (errCode != COUCHSTORE_SUCCESS) {
1033        logger.log(EXTENSION_LOG_WARNING,
1034                   "CouchKVStore::compactDB: openDB#2 error:%s, file:%s, "
1035                   "fileRev:%" PRIu64,
1036                   couchstore_strerror(errCode),
1037                   new_file.c_str(),
1038                   targetDb.getFileRev());
1039        if (remove(new_file.c_str()) != 0) {
1040            logger.log(EXTENSION_LOG_WARNING,
1041                       "CouchKVStore::compactDB: remove error:%s, path:%s",
1042                       cb_strerror().c_str(), new_file.c_str());
1043        }
1044        return false;
1045    }
1046
1047    // Update the global VBucket file map so all operations use the new file
1048    updateDbFileMap(vbid, new_rev);
1049
1050    logger.log(EXTENSION_LOG_INFO,
1051               "INFO: created new couch db file, name:%s rev:%" PRIu64,
1052               new_file.c_str(), new_rev);
1053
1054    couchstore_db_info(targetDb.getDb(), &info);
1055    hook_ctx->stats.post = toFileInfo(info);
1056
1057    cachedFileSize[vbid] = info.file_size;
1058    cachedSpaceUsed[vbid] = info.space_used;
1059
1060    // also update cached state with dbinfo
1061    vbucket_state* state = getVBucketState(vbid);
1062    if (state) {
1063        state->highSeqno = info.last_sequence;
1064        state->purgeSeqno = info.purge_seq;
1065        cachedDeleteCount[vbid] = info.deleted_count;
1066        cachedDocCount[vbid] = info.doc_count;
1067    }
1068
1069    // Removing the stale couch file
1070    unlinkCouchFile(vbid, compactdb.getFileRev());
1071
1072    st.compactHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1073            ProcessClock::now() - start));
1074
1075    return true;
1076}
1077
1078vbucket_state * CouchKVStore::getVBucketState(uint16_t vbucketId) {
1079    return cachedVBStates[vbucketId].get();
1080}
1081
1082bool CouchKVStore::setVBucketState(uint16_t vbucketId,
1083                                   const vbucket_state& vbstate,
1084                                   VBStatePersist options) {
1085    std::map<uint16_t, uint64_t>::iterator mapItr;
1086    couchstore_error_t errorCode;
1087
1088    if (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT ||
1089            options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT) {
1090        DbHolder db(*this);
1091        errorCode =
1092                openDB(vbucketId, db, (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE);
1093        if (errorCode != COUCHSTORE_SUCCESS) {
1094            ++st.numVbSetFailure;
1095            logger.log(EXTENSION_LOG_WARNING,
1096                       "CouchKVStore::setVBucketState: openDB error:%s, "
1097                       "vb:%" PRIu16 ", fileRev:%" PRIu64,
1098                       couchstore_strerror(errorCode),
1099                       vbucketId,
1100                       db.getFileRev());
1101            return false;
1102        }
1103
1104        errorCode = saveVBState(db, vbstate);
1105        if (errorCode != COUCHSTORE_SUCCESS) {
1106            ++st.numVbSetFailure;
1107            logger.log(EXTENSION_LOG_WARNING,
1108                       "CouchKVStore:setVBucketState: saveVBState error:%s, "
1109                       "vb:%" PRIu16 ", fileRev:%" PRIu64,
1110                       couchstore_strerror(errorCode),
1111                       vbucketId,
1112                       db.getFileRev());
1113            return false;
1114        }
1115
1116        if (options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT) {
1117            errorCode = couchstore_commit(db);
1118            if (errorCode != COUCHSTORE_SUCCESS) {
1119                ++st.numVbSetFailure;
1120                logger.log(EXTENSION_LOG_WARNING,
1121                           "CouchKVStore:setVBucketState: couchstore_commit "
1122                           "error:%s [%s], vb:%" PRIu16 ", rev:%" PRIu64,
1123                           couchstore_strerror(errorCode),
1124                           couchkvstore_strerrno(db, errorCode).c_str(),
1125                           vbucketId,
1126                           db.getFileRev());
1127                return false;
1128            }
1129        }
1130
1131        DbInfo info;
1132        errorCode = couchstore_db_info(db, &info);
1133        if (errorCode != COUCHSTORE_SUCCESS) {
1134            logger.log(EXTENSION_LOG_WARNING,
1135                       "CouchKVStore::setVBucketState: couchstore_db_info "
1136                       "error:%s, vb:%" PRIu16, couchstore_strerror(errorCode),
1137                       vbucketId);
1138        } else {
1139            cachedSpaceUsed[vbucketId] = info.space_used;
1140            cachedFileSize[vbucketId] = info.file_size;
1141        }
1142    } else {
1143        throw std::invalid_argument("CouchKVStore::setVBucketState: invalid vb state "
1144                        "persist option specified for vbucket id:" +
1145                        std::to_string(vbucketId));
1146    }
1147
1148    return true;
1149}
1150
1151bool CouchKVStore::snapshotVBucket(uint16_t vbucketId,
1152                                   const vbucket_state &vbstate,
1153                                   VBStatePersist options) {
1154    if (isReadOnly()) {
1155        logger.log(EXTENSION_LOG_WARNING,
1156                   "CouchKVStore::snapshotVBucket: cannot be performed on a "
1157                   "read-only KVStore instance");
1158        return false;
1159    }
1160
1161    auto start = ProcessClock::now();
1162
1163    if (updateCachedVBState(vbucketId, vbstate) &&
1164         (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT ||
1165          options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT)) {
1166        vbucket_state* vbs = getVBucketState(vbucketId);
1167        if (!setVBucketState(vbucketId, *vbs, options)) {
1168            logger.log(EXTENSION_LOG_WARNING,
1169                       "CouchKVStore::snapshotVBucket: setVBucketState failed "
1170                       "state:%s, vb:%" PRIu16,
1171                       VBucket::toString(vbstate.state), vbucketId);
1172            return false;
1173        }
1174    }
1175
1176    LOG(EXTENSION_LOG_DEBUG,
1177        "CouchKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16 " state:%s",
1178        vbucketId,
1179        vbstate.toJSON().c_str());
1180
1181    st.snapshotHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1182            ProcessClock::now() - start));
1183
1184    return true;
1185}
1186
1187StorageProperties CouchKVStore::getStorageProperties() {
1188    StorageProperties rv(StorageProperties::EfficientVBDump::Yes,
1189                         StorageProperties::EfficientVBDeletion::Yes,
1190                         StorageProperties::PersistedDeletion::Yes,
1191                         StorageProperties::EfficientGet::Yes,
1192                         StorageProperties::ConcurrentWriteCompact::No);
1193    return rv;
1194}
1195
1196bool CouchKVStore::commit(const Item* collectionsManifest) {
1197    if (isReadOnly()) {
1198        throw std::logic_error("CouchKVStore::commit: Not valid on a read-only "
1199                        "object.");
1200    }
1201
1202    if (intransaction) {
1203        if (commit2couchstore(collectionsManifest)) {
1204            intransaction = false;
1205            transactionCtx.reset();
1206        }
1207    }
1208
1209    return !intransaction;
1210}
1211
1212bool CouchKVStore::getStat(const char* name, size_t& value)  {
1213    if (strcmp("failure_compaction", name) == 0) {
1214        value = st.numCompactionFailure.load();
1215        return true;
1216    } else if (strcmp("failure_get", name) == 0) {
1217        value = st.numGetFailure.load();
1218        return true;
1219    } else if (strcmp("io_total_read_bytes", name) == 0) {
1220        value = st.fsStats.totalBytesRead.load() +
1221                st.fsStatsCompaction.totalBytesRead.load();
1222        return true;
1223    } else if (strcmp("io_total_write_bytes", name) == 0) {
1224        value = st.fsStats.totalBytesWritten.load() +
1225                st.fsStatsCompaction.totalBytesWritten.load();
1226        return true;
1227    } else if (strcmp("io_compaction_read_bytes", name) == 0) {
1228        value = st.fsStatsCompaction.totalBytesRead;
1229        return true;
1230    } else if (strcmp("io_compaction_write_bytes", name) == 0) {
1231        value = st.fsStatsCompaction.totalBytesWritten;
1232        return true;
1233    } else if (strcmp("io_bg_fetch_read_count", name) == 0) {
1234        value = st.getMultiFsReadCount;
1235        return true;
1236    }
1237
1238    return false;
1239}
1240
1241void CouchKVStore::pendingTasks() {
1242    if (isReadOnly()) {
1243        throw std::logic_error("CouchKVStore::pendingTasks: Not valid on a "
1244                        "read-only object.");
1245    }
1246
1247    if (!pendingFileDeletions.empty()) {
1248        std::queue<std::string> queue;
1249        pendingFileDeletions.getAll(queue);
1250
1251        while (!queue.empty()) {
1252            std::string filename_str = queue.front();
1253            if (remove(filename_str.c_str()) == -1) {
1254                logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::pendingTasks: "
1255                           "remove error:%d, file%s", errno,
1256                           filename_str.c_str());
1257                if (errno != ENOENT) {
1258                    pendingFileDeletions.push(filename_str);
1259                }
1260            }
1261            queue.pop();
1262        }
1263    }
1264}
1265
1266ScanContext* CouchKVStore::initScanContext(
1267        std::shared_ptr<StatusCallback<GetValue>> cb,
1268        std::shared_ptr<StatusCallback<CacheLookup>> cl,
1269        uint16_t vbid,
1270        uint64_t startSeqno,
1271        DocumentFilter options,
1272        ValueFilter valOptions) {
1273    DbHolder db(*this);
1274    couchstore_error_t errorCode =
1275            openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
1276    if (errorCode != COUCHSTORE_SUCCESS) {
1277        logger.log(EXTENSION_LOG_WARNING,
1278                   "CouchKVStore::initScanContext: openDB error:%s, "
1279                   "name:%s/%" PRIu16 ".couch.%" PRIu64,
1280                   couchstore_strerror(errorCode),
1281                   dbname.c_str(),
1282                   vbid,
1283                   db.getFileRev());
1284        return NULL;
1285    }
1286
1287    DbInfo info;
1288    errorCode = couchstore_db_info(db, &info);
1289    if (errorCode != COUCHSTORE_SUCCESS) {
1290        logger.log(EXTENSION_LOG_WARNING,
1291                   "CouchKVStore::initScanContext: couchstore_db_info error:%s",
1292                   couchstore_strerror(errorCode));
1293        LOG(EXTENSION_LOG_WARNING,
1294            "CouchKVStore::initScanContext: Failed to read DB info for "
1295            "backfill. vb:%" PRIu16 " rev:%" PRIu64 " error: %s",
1296            vbid,
1297            db.getFileRev(),
1298            couchstore_strerror(errorCode));
1299        return NULL;
1300    }
1301
1302    uint64_t count = 0;
1303    errorCode = couchstore_changes_count(
1304            db, startSeqno, std::numeric_limits<uint64_t>::max(), &count);
1305    if (errorCode != COUCHSTORE_SUCCESS) {
1306        LOG(EXTENSION_LOG_WARNING,
1307            "CouchKVStore::initScanContext:Failed to obtain changes "
1308            "count for vb:%" PRIu16 " rev:%" PRIu64 " start_seqno:%" PRIu64
1309            " error: %s",
1310            vbid,
1311            db.getFileRev(),
1312            startSeqno,
1313            couchstore_strerror(errorCode));
1314        return NULL;
1315    }
1316
1317    size_t scanId = scanCounter++;
1318
1319    {
1320        LockHolder lh(scanLock);
1321        scans[scanId] = db.releaseDb();
1322    }
1323
1324    ScanContext* sctx = new ScanContext(cb,
1325                                        cl,
1326                                        vbid,
1327                                        scanId,
1328                                        startSeqno,
1329                                        info.last_sequence,
1330                                        info.purge_seq,
1331                                        options,
1332                                        valOptions,
1333                                        count,
1334                                        configuration);
1335    sctx->logger = &logger;
1336    return sctx;
1337}
1338
1339static couchstore_docinfos_options getDocFilter(const DocumentFilter& filter) {
1340    switch (filter) {
1341    case DocumentFilter::ALL_ITEMS:
1342        return COUCHSTORE_NO_OPTIONS;
1343    case DocumentFilter::NO_DELETES:
1344        return COUCHSTORE_NO_DELETES;
1345    }
1346
1347    std::string err("getDocFilter: Illegal document filter!" +
1348                    std::to_string(static_cast<int>(filter)));
1349    throw std::runtime_error(err);
1350}
1351
1352scan_error_t CouchKVStore::scan(ScanContext* ctx) {
1353    if (!ctx) {
1354        return scan_failed;
1355    }
1356
1357    if (ctx->lastReadSeqno == ctx->maxSeqno) {
1358        return scan_success;
1359    }
1360
1361    TRACE_EVENT_START2("CouchKVStore",
1362                       "scan",
1363                       "vbid",
1364                       ctx->vbid,
1365                       "startSeqno",
1366                       ctx->startSeqno);
1367
1368    Db* db;
1369    {
1370        LockHolder lh(scanLock);
1371        auto itr = scans.find(ctx->scanId);
1372        if (itr == scans.end()) {
1373            return scan_failed;
1374        }
1375
1376        db = itr->second;
1377    }
1378
1379    uint64_t start = ctx->startSeqno;
1380    if (ctx->lastReadSeqno != 0) {
1381        start = ctx->lastReadSeqno + 1;
1382    }
1383
1384    couchstore_error_t errorCode;
1385    errorCode = couchstore_changes_since(db,
1386                                         start,
1387                                         getDocFilter(ctx->docFilter),
1388                                         recordDbDumpC,
1389                                         static_cast<void*>(ctx));
1390
1391    TRACE_EVENT_END1(
1392            "CouchKVStore", "scan", "lastReadSeqno", ctx->lastReadSeqno);
1393
1394    if (errorCode != COUCHSTORE_SUCCESS) {
1395        if (errorCode == COUCHSTORE_ERROR_CANCEL) {
1396            return scan_again;
1397        } else {
1398            logger.log(EXTENSION_LOG_WARNING,
1399                       "CouchKVStore::scan couchstore_changes_since "
1400                       "error:%s [%s]", couchstore_strerror(errorCode),
1401                       couchkvstore_strerrno(db, errorCode).c_str());
1402            return scan_failed;
1403        }
1404    }
1405    return scan_success;
1406}
1407
1408void CouchKVStore::destroyScanContext(ScanContext* ctx) {
1409    if (!ctx) {
1410        return;
1411    }
1412
1413    LockHolder lh(scanLock);
1414    auto itr = scans.find(ctx->scanId);
1415    if (itr != scans.end()) {
1416        closeDatabaseHandle(itr->second);
1417        scans.erase(itr);
1418    }
1419    delete ctx;
1420}
1421
1422DbInfo CouchKVStore::getDbInfo(uint16_t vbid) {
1423    DbHolder db(*this);
1424    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
1425    if (errCode == COUCHSTORE_SUCCESS) {
1426        DbInfo info;
1427        errCode = couchstore_db_info(db, &info);
1428        if (errCode == COUCHSTORE_SUCCESS) {
1429            return info;
1430        } else {
1431            throw std::runtime_error(
1432                    "CouchKVStore::getDbInfo: failed "
1433                    "to read database info for vBucket " +
1434                    std::to_string(vbid) + " revision " +
1435                    std::to_string(db.getFileRev()) +
1436                    " - couchstore returned error: " +
1437                    couchstore_strerror(errCode));
1438        }
1439    } else {
1440        // open failed - map couchstore error code to exception.
1441        std::errc ec;
1442        switch (errCode) {
1443            case COUCHSTORE_ERROR_OPEN_FILE:
1444                ec = std::errc::no_such_file_or_directory; break;
1445            default:
1446                ec = std::errc::io_error; break;
1447        }
1448        throw std::system_error(
1449                std::make_error_code(ec),
1450                "CouchKVStore::getDbInfo: failed to open database file for "
1451                "vBucket = " +
1452                        std::to_string(vbid) + " rev = " +
1453                        std::to_string(db.getFileRev()) + " with error:" +
1454                        couchstore_strerror(errCode));
1455    }
1456}
1457
1458void CouchKVStore::close() {
1459    intransaction = false;
1460}
1461
1462uint64_t CouchKVStore::checkNewRevNum(std::string &dbFileName, bool newFile) {
1463    uint64_t newrev = 0;
1464    std::string nameKey;
1465
1466    if (!newFile) {
1467        // extract out the file revision number first
1468        size_t secondDot = dbFileName.rfind(".");
1469        nameKey = dbFileName.substr(0, secondDot);
1470    } else {
1471        nameKey = dbFileName;
1472    }
1473    nameKey.append(".");
1474    const auto files = cb::io::findFilesWithPrefix(nameKey);
1475    std::vector<std::string>::const_iterator itor;
1476    // found file(s) whoes name has the same key name pair with different
1477    // revision number
1478    for (itor = files.begin(); itor != files.end(); ++itor) {
1479        const std::string &filename = *itor;
1480        if (endWithCompact(filename)) {
1481            continue;
1482        }
1483
1484        size_t secondDot = filename.rfind(".");
1485        char *ptr = NULL;
1486        uint64_t revnum = strtoull(filename.substr(secondDot + 1).c_str(), &ptr, 10);
1487        if (newrev < revnum) {
1488            newrev = revnum;
1489            dbFileName = filename;
1490        }
1491    }
1492    return newrev;
1493}
1494
1495void CouchKVStore::updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev) {
1496    if (vbucketId >= numDbFiles) {
1497        logger.log(EXTENSION_LOG_WARNING,
1498                   "CouchKVStore::updateDbFileMap: Cannot update db file map "
1499                   "for an invalid vbucket, vb:%" PRIu16", rev:%" PRIu64,
1500                   vbucketId, newFileRev);
1501        return;
1502    }
1503    // MB-27963: obtain write access whilst we update the file map openDB also
1504    // obtains this mutex to ensure the fileRev it obtains doesn't become stale
1505    // by the time it hits sys_open.
1506    std::lock_guard<cb::WriterLock> lg(openDbMutex);
1507
1508    (*dbFileRevMap)[vbucketId] = newFileRev;
1509}
1510
1511couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
1512                                        DbHolder& db,
1513                                        couchstore_open_flags options,
1514                                        FileOpsInterface* ops) {
1515    // MB-27963: obtain read access whilst we open the file, updateDbFileMap
1516    // serialises on this mutex so we can be sure the fileRev we read should
1517    // still be a valid file once we hit sys_open
1518    std::lock_guard<cb::ReaderLock> lg(openDbMutex);
1519    uint64_t fileRev = (*dbFileRevMap)[vbucketId];
1520    return openSpecificDB(vbucketId, fileRev, db, options, ops);
1521}
1522
1523couchstore_error_t CouchKVStore::openSpecificDB(uint16_t vbucketId,
1524                                                uint64_t fileRev,
1525                                                DbHolder& db,
1526                                                couchstore_open_flags options,
1527                                                FileOpsInterface* ops) {
1528    std::string dbFileName = getDBFileName(dbname, vbucketId, fileRev);
1529    db.setFileRev(fileRev); // save the rev so the caller can log it
1530
1531    if(ops == nullptr) {
1532        ops = statCollectingFileOps.get();
1533    }
1534
1535    couchstore_error_t errorCode = COUCHSTORE_SUCCESS;
1536
1537    /**
1538     * This flag disables IO buffering in couchstore which means
1539     * file operations will trigger syscalls immediately. This has
1540     * a detrimental impact on performance and is only intended
1541     * for testing.
1542     */
1543    if(!configuration.getBuffered()) {
1544        options |= COUCHSTORE_OPEN_FLAG_UNBUFFERED;
1545    }
1546
1547    /* get the flags that determine the tracing and validation of
1548     *  couchstore file operations
1549     */
1550    if (configuration.getCouchstoreTracingEnabled()) {
1551        options |= COUCHSTORE_OPEN_WITH_TRACING;
1552        if (!(options & COUCHSTORE_OPEN_FLAG_RDONLY)) {
1553            TRACE_INSTANT2("couchstore_write",
1554                       "openSpecificDB",
1555                       "vbucketId",
1556                       vbucketId,
1557                       "fileRev",
1558                       fileRev);
1559        }
1560    }
1561    if (configuration.getCouchstoreWriteValidationEnabled()) {
1562        options |= COUCHSTORE_OPEN_WITH_WRITE_VALIDATION;
1563    }
1564    if (configuration.getCouchstoreMprotectEnabled()) {
1565        options |= COUCHSTORE_OPEN_WITH_MPROTECT;
1566    }
1567
1568    errorCode = couchstore_open_db_ex(
1569            dbFileName.c_str(), options, ops, db.getDbAddress());
1570
1571    /* update command statistics */
1572    st.numOpen++;
1573    if (errorCode) {
1574        st.numOpenFailure++;
1575        logger.log(EXTENSION_LOG_WARNING,
1576                   "CouchKVStore::openDB: error:%s [%s],"
1577                   " name:%s, option:%" PRIX64 ", fileRev:%" PRIu64,
1578                   couchstore_strerror(errorCode),
1579                   cb_strerror().c_str(),
1580                   dbFileName.c_str(),
1581                   options,
1582                   fileRev);
1583
1584        if (errorCode == COUCHSTORE_ERROR_NO_SUCH_FILE) {
1585            auto dotPos = dbFileName.find_last_of(".");
1586            if (dotPos != std::string::npos) {
1587                dbFileName = dbFileName.substr(0, dotPos);
1588            }
1589            auto files = cb::io::findFilesWithPrefix(dbFileName);
1590            logger.log(
1591                    EXTENSION_LOG_WARNING,
1592                    "CouchKVStore::openDB: No such file, found:%zd alternative "
1593                    "files for %s",
1594                    files.size(),
1595                    dbFileName.c_str());
1596            for (const auto& f : files) {
1597                logger.log(EXTENSION_LOG_WARNING,
1598                           "CouchKVStore::openDB: Found %s",
1599                           f.c_str());
1600            }
1601        }
1602    }
1603
1604    return errorCode;
1605}
1606
1607void CouchKVStore::populateFileNameMap(std::vector<std::string> &filenames,
1608                                       std::vector<uint16_t> *vbids) {
1609    std::vector<std::string>::iterator fileItr;
1610
1611    for (fileItr = filenames.begin(); fileItr != filenames.end(); ++fileItr) {
1612        const std::string &filename = *fileItr;
1613        size_t secondDot = filename.rfind(".");
1614        std::string nameKey = filename.substr(0, secondDot);
1615        size_t firstDot = nameKey.rfind(".");
1616#ifdef _MSC_VER
1617        size_t firstSlash = nameKey.rfind("\\");
1618#else
1619        size_t firstSlash = nameKey.rfind("/");
1620#endif
1621
1622        std::string revNumStr = filename.substr(secondDot + 1);
1623        char *ptr = NULL;
1624        uint64_t revNum = strtoull(revNumStr.c_str(), &ptr, 10);
1625
1626        std::string vbIdStr = nameKey.substr(firstSlash + 1,
1627                                            (firstDot - firstSlash) - 1);
1628        if (allDigit(vbIdStr)) {
1629            int vbId = atoi(vbIdStr.c_str());
1630            if (vbids) {
1631                vbids->push_back(static_cast<uint16_t>(vbId));
1632            }
1633            uint64_t old_rev_num = (*dbFileRevMap)[vbId];
1634            if (old_rev_num == revNum) {
1635                continue;
1636            } else if (old_rev_num < revNum) { // stale revision found
1637                (*dbFileRevMap)[vbId] = revNum;
1638            } else { // stale file found (revision id has rolled over)
1639                old_rev_num = revNum;
1640            }
1641            std::stringstream old_file;
1642            old_file << dbname << "/" << vbId << ".couch." << old_rev_num;
1643            if (access(old_file.str().c_str(), F_OK) == 0) {
1644                if (!isReadOnly()) {
1645                    if (remove(old_file.str().c_str()) == 0) {
1646                        logger.log(EXTENSION_LOG_INFO,
1647                                  "CouchKVStore::populateFileNameMap: Removed "
1648                                  "stale file:%s", old_file.str().c_str());
1649                    } else {
1650                        logger.log(EXTENSION_LOG_WARNING,
1651                                   "CouchKVStore::populateFileNameMap: remove "
1652                                   "error:%s, file:%s", cb_strerror().c_str(),
1653                                   old_file.str().c_str());
1654                    }
1655                } else {
1656                    logger.log(EXTENSION_LOG_WARNING,
1657                               "CouchKVStore::populateFileNameMap: A read-only "
1658                               "instance of the underlying store "
1659                               "was not allowed to delete a stale file:%s",
1660                               old_file.str().c_str());
1661                }
1662            }
1663        } else {
1664            // skip non-vbucket database file, master.couch etc
1665            logger.log(EXTENSION_LOG_DEBUG,
1666                       "CouchKVStore::populateFileNameMap: Non-vbucket database file, %s, skip adding "
1667                       "to CouchKVStore dbFileMap", filename.c_str());
1668        }
1669    }
1670}
1671
1672couchstore_error_t CouchKVStore::fetchDoc(Db* db,
1673                                          DocInfo* docinfo,
1674                                          GetValue& docValue,
1675                                          uint16_t vbId,
1676                                          GetMetaOnly metaOnly) {
1677    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1678    std::unique_ptr<MetaData> metadata;
1679    try {
1680        metadata = MetaDataFactory::createMetaData(docinfo->rev_meta);
1681    } catch (std::logic_error&) {
1682        return COUCHSTORE_ERROR_DB_NO_LONGER_VALID;
1683    }
1684
1685    if (metaOnly == GetMetaOnly::Yes) {
1686        // Collections: TODO: Permanently restore to stored namespace
1687        auto it = std::make_unique<Item>(
1688                makeDocKey(docinfo->id,
1689                           configuration.shouldPersistDocNamespace()),
1690                metadata->getFlags(),
1691                metadata->getExptime(),
1692                nullptr,
1693                docinfo->size,
1694                metadata->getDataType(),
1695                metadata->getCas(),
1696                docinfo->db_seq,
1697                vbId);
1698
1699        it->setRevSeqno(docinfo->rev_seq);
1700
1701        if (docinfo->deleted) {
1702            it->setDeleted();
1703        }
1704        docValue = GetValue(std::move(it));
1705        // update ep-engine IO stats
1706        ++st.io_bg_fetch_docs_read;
1707        st.io_bgfetch_doc_bytes += (docinfo->id.size + docinfo->rev_meta.size);
1708    } else {
1709        Doc *doc = nullptr;
1710        size_t valuelen = 0;
1711        void* valuePtr = nullptr;
1712        protocol_binary_datatype_t datatype = PROTOCOL_BINARY_RAW_BYTES;
1713        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1714                                                   DECOMPRESS_DOC_BODIES);
1715        if (errCode == COUCHSTORE_SUCCESS) {
1716            if (doc == nullptr) {
1717                throw std::logic_error("CouchKVStore::fetchDoc: doc is NULL");
1718            }
1719
1720            if (doc->id.size > UINT16_MAX) {
1721                throw std::logic_error("CouchKVStore::fetchDoc: "
1722                            "doc->id.size (which is" +
1723                            std::to_string(doc->id.size) + ") is greater than "
1724                            + std::to_string(UINT16_MAX));
1725            }
1726
1727            valuelen = doc->data.size;
1728            valuePtr = doc->data.buf;
1729
1730            if (metadata->getVersionInitialisedFrom() == MetaData::Version::V0) {
1731                // This is a super old version of a couchstore file.
1732                // Try to determine if the document is JSON or raw bytes
1733                datatype = determine_datatype(doc->data);
1734            } else {
1735                datatype = metadata->getDataType();
1736            }
1737        } else if (errCode == COUCHSTORE_ERROR_DOC_NOT_FOUND && docinfo->deleted) {
1738            datatype = metadata->getDataType();
1739        } else {
1740            return errCode;
1741        }
1742
1743        try {
1744            // Collections: TODO: Restore to stored namespace
1745            auto it = std::make_unique<Item>(
1746                    makeDocKey(docinfo->id,
1747                               configuration.shouldPersistDocNamespace()),
1748                    metadata->getFlags(),
1749                    metadata->getExptime(),
1750                    valuePtr,
1751                    valuelen,
1752                    datatype,
1753                    metadata->getCas(),
1754                    docinfo->db_seq,
1755                    vbId,
1756                    docinfo->rev_seq);
1757
1758             if (docinfo->deleted) {
1759                 it->setDeleted();
1760             }
1761             docValue = GetValue(std::move(it));
1762        } catch (std::bad_alloc&) {
1763            couchstore_free_document(doc);
1764            return COUCHSTORE_ERROR_ALLOC_FAIL;
1765        }
1766
1767        // update ep-engine IO stats
1768        ++st.io_bg_fetch_docs_read;
1769        st.io_bgfetch_doc_bytes +=
1770                (docinfo->id.size + docinfo->rev_meta.size + valuelen);
1771
1772        couchstore_free_document(doc);
1773    }
1774    return COUCHSTORE_SUCCESS;
1775}
1776
1777int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
1778
1779    ScanContext* sctx = static_cast<ScanContext*>(ctx);
1780    auto* cb = sctx->callback.get();
1781    auto* cl = sctx->lookup.get();
1782
1783    Doc *doc = nullptr;
1784    sized_buf value{nullptr, 0};
1785    uint64_t byseqno = docinfo->db_seq;
1786    uint16_t vbucketId = sctx->vbid;
1787
1788    sized_buf key = docinfo->id;
1789    if (key.size > UINT16_MAX) {
1790        throw std::invalid_argument("CouchKVStore::recordDbDump: "
1791                        "docinfo->id.size (which is " + std::to_string(key.size) +
1792                        ") is greater than " + std::to_string(UINT16_MAX));
1793    }
1794
1795    // Collections: TODO: Permanently restore to stored namespace
1796    DocKey docKey = makeDocKey(
1797            docinfo->id, sctx->config.shouldPersistDocNamespace());
1798
1799    if (sctx->collectionsContext.manageSeparator(docKey)) {
1800        sctx->lastReadSeqno = byseqno;
1801        return COUCHSTORE_SUCCESS;
1802    }
1803
1804    CacheLookup lookup(docKey,
1805                       byseqno,
1806                       vbucketId,
1807                       sctx->collectionsContext.getSeparator());
1808    cl->callback(lookup);
1809    if (cl->getStatus() == ENGINE_KEY_EEXISTS) {
1810        sctx->lastReadSeqno = byseqno;
1811        return COUCHSTORE_SUCCESS;
1812    } else if (cl->getStatus() == ENGINE_ENOMEM) {
1813        return COUCHSTORE_ERROR_CANCEL;
1814    }
1815
1816    auto metadata = MetaDataFactory::createMetaData(docinfo->rev_meta);
1817
1818    if (sctx->valFilter != ValueFilter::KEYS_ONLY) {
1819        couchstore_open_options openOptions = 0;
1820
1821        /**
1822         * If the stored document has V0 metdata (no datatype)
1823         * or no special request is made to retrieve compressed documents
1824         * as is, then DECOMPRESS the document and update datatype
1825         */
1826        if (docinfo->rev_meta.size == metadata->getMetaDataSize(MetaData::Version::V0) ||
1827            sctx->valFilter == ValueFilter::VALUES_DECOMPRESSED) {
1828            openOptions = DECOMPRESS_DOC_BODIES;
1829        }
1830
1831        auto errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1832                                                        openOptions);
1833
1834        if (errCode == COUCHSTORE_SUCCESS) {
1835            value = doc->data;
1836            if (doc->data.size) {
1837                if ((openOptions & DECOMPRESS_DOC_BODIES) == 0) {
1838                    // We always store the document bodies compressed on disk,
1839                    // but now the client _wanted_ to fetch the document
1840                    // in a compressed mode.
1841                    // We've never stored the "compressed" flag on disk
1842                    // (as we don't keep items compressed in memory).
1843                    // Update the datatype flag for this item to
1844                    // reflect that it is compressed so that the
1845                    // receiver of the object may notice (Note:
1846                    // this is currently _ONLY_ happening via DCP
1847                     auto datatype = metadata->getDataType();
1848                     metadata->setDataType(datatype | PROTOCOL_BINARY_DATATYPE_SNAPPY);
1849                } else if (metadata->getVersionInitialisedFrom() == MetaData::Version::V0) {
1850                    // This is a super old version of a couchstore file.
1851                    // Try to determine if the document is JSON or raw bytes
1852                    metadata->setDataType(determine_datatype(doc->data));
1853                }
1854            } else {
1855                // No data, it cannot have a datatype!
1856                metadata->setDataType(PROTOCOL_BINARY_RAW_BYTES);
1857            }
1858        } else if (errCode != COUCHSTORE_ERROR_DOC_NOT_FOUND) {
1859            sctx->logger->log(EXTENSION_LOG_WARNING,
1860                              "CouchKVStore::recordDbDump: "
1861                              "couchstore_open_doc_with_docinfo error:%s [%s], "
1862                              "vb:%" PRIu16 ", seqno:%" PRIu64,
1863                              couchstore_strerror(errCode),
1864                              couchkvstore_strerrno(db, errCode).c_str(),
1865                              vbucketId, docinfo->rev_seq);
1866            return COUCHSTORE_SUCCESS;
1867        }
1868    }
1869
1870    // Collections: TODO: Permanently restore to stored namespace
1871    auto it = std::make_unique<Item>(
1872            DocKey(makeDocKey(key, sctx->config.shouldPersistDocNamespace())),
1873            metadata->getFlags(),
1874            metadata->getExptime(),
1875            value.buf,
1876            value.size,
1877            metadata->getDataType(),
1878            metadata->getCas(),
1879            docinfo->db_seq, // return seq number being persisted on disk
1880            vbucketId,
1881            docinfo->rev_seq);
1882
1883    if (docinfo->deleted) {
1884        it->setDeleted();
1885    }
1886
1887    bool onlyKeys = (sctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false;
1888    GetValue rv(std::move(it), ENGINE_SUCCESS, -1, onlyKeys);
1889    cb->callback(rv);
1890
1891    couchstore_free_document(doc);
1892
1893    if (cb->getStatus() == ENGINE_ENOMEM) {
1894        return COUCHSTORE_ERROR_CANCEL;
1895    }
1896
1897    sctx->lastReadSeqno = byseqno;
1898    return COUCHSTORE_SUCCESS;
1899}
1900
1901bool CouchKVStore::commit2couchstore(const Item* collectionsManifest) {
1902    bool success = true;
1903
1904    size_t pendingCommitCnt = pendingReqsQ.size();
1905    if (pendingCommitCnt == 0 && !collectionsManifest) {
1906        return success;
1907    }
1908
1909    // Use the vbucket of the first item or the manifest item
1910    uint16_t vbucket2flush = pendingCommitCnt
1911                                     ? pendingReqsQ[0]->getVBucketId()
1912                                     : collectionsManifest->getVBucketId();
1913
1914    TRACE_EVENT2("CouchKVStore",
1915                 "commit2couchstore",
1916                 "vbid",
1917                 vbucket2flush,
1918                 "pendingCommitCnt",
1919                 pendingCommitCnt);
1920
1921    // When an item and a manifest are present, vbucket2flush is read from the
1922    // item. Check it matches the manifest
1923    if (pendingCommitCnt && collectionsManifest &&
1924        vbucket2flush != collectionsManifest->getVBucketId()) {
1925        throw std::logic_error(
1926                "CouchKVStore::commit2couchstore: manifest/item vbucket "
1927                "mismatch vbucket2flush:" +
1928                std::to_string(vbucket2flush) + " manifest vb:" +
1929                std::to_string(collectionsManifest->getVBucketId()));
1930    }
1931
1932    std::vector<Doc*> docs(pendingCommitCnt);
1933    std::vector<DocInfo*> docinfos(pendingCommitCnt);
1934
1935    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1936        CouchRequest *req = pendingReqsQ[i];
1937        docs[i] = (Doc *)req->getDbDoc();
1938        docinfos[i] = req->getDbDocInfo();
1939        if (vbucket2flush != req->getVBucketId()) {
1940            throw std::logic_error(
1941                    "CouchKVStore::commit2couchstore: "
1942                    "mismatch between vbucket2flush (which is "
1943                    + std::to_string(vbucket2flush) + ") and pendingReqsQ["
1944                    + std::to_string(i) + "] (which is "
1945                    + std::to_string(req->getVBucketId()) + ")");
1946        }
1947    }
1948
1949    // The docinfo callback needs to know if the DocNamespace feature is on
1950    kvstats_ctx kvctx(configuration.shouldPersistDocNamespace());
1951    // flush all
1952    couchstore_error_t errCode =
1953            saveDocs(vbucket2flush, docs, docinfos, kvctx, collectionsManifest);
1954
1955    if (errCode) {
1956        success = false;
1957        logger.log(EXTENSION_LOG_WARNING,
1958                   "CouchKVStore::commit2couchstore: saveDocs error:%s, "
1959                   "vb:%" PRIu16,
1960                   couchstore_strerror(errCode),
1961                   vbucket2flush);
1962    }
1963
1964    commitCallback(pendingReqsQ, kvctx, errCode);
1965
1966    // clean up
1967    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1968        delete pendingReqsQ[i];
1969    }
1970    pendingReqsQ.clear();
1971    return success;
1972}
1973
1974static int readDocInfos(Db *db, DocInfo *docinfo, void *ctx) {
1975    if (ctx == nullptr) {
1976        throw std::invalid_argument("readDocInfos: ctx must be non-NULL");
1977    }
1978    if (!docinfo) {
1979        throw std::invalid_argument("readDocInfos: docInfo must be non-NULL");
1980    }
1981    kvstats_ctx* cbCtx = static_cast<kvstats_ctx*>(ctx);
1982    if(docinfo) {
1983        // An item exists in the VB DB file.
1984        if (!docinfo->deleted) {
1985            // Collections: TODO: Permanently restore to stored namespace
1986            auto itr = cbCtx->keyStats.find(
1987                    makeDocKey(docinfo->id, cbCtx->persistDocNamespace));
1988            if (itr != cbCtx->keyStats.end()) {
1989                itr->second = true;
1990            }
1991        }
1992    }
1993    return 0;
1994}
1995
1996couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid,
1997                                          const std::vector<Doc*>& docs,
1998                                          std::vector<DocInfo*>& docinfos,
1999                                          kvstats_ctx& kvctx,
2000                                          const Item* collectionsManifest) {
2001    couchstore_error_t errCode;
2002    DbInfo info;
2003    DbHolder db(*this);
2004    errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_CREATE);
2005    if (errCode != COUCHSTORE_SUCCESS) {
2006        logger.log(EXTENSION_LOG_WARNING,
2007                   "CouchKVStore::saveDocs: openDB error:%s, vb:%" PRIu16
2008                   ", rev:%" PRIu64 ", numdocs:%" PRIu64,
2009                   couchstore_strerror(errCode),
2010                   vbid,
2011                   db.getFileRev(),
2012                   uint64_t(docs.size()));
2013        return errCode;
2014    } else {
2015        vbucket_state* state = getVBucketState(vbid);
2016        if (state == nullptr) {
2017            throw std::logic_error(
2018                    "CouchKVStore::saveDocs: cachedVBStates[" +
2019                    std::to_string(vbid) + "] is NULL");
2020        }
2021
2022        uint64_t maxDBSeqno = 0;
2023
2024        // Only do a couchstore_save_documents if there are docs
2025        if (docs.size() > 0) {
2026            std::vector<sized_buf> ids(docs.size());
2027            for (size_t idx = 0; idx < docs.size(); idx++) {
2028                ids[idx] = docinfos[idx]->id;
2029                maxDBSeqno = std::max(maxDBSeqno, docinfos[idx]->db_seq);
2030                DocKey key = makeDocKey(
2031                        ids[idx], configuration.shouldPersistDocNamespace());
2032                kvctx.keyStats[key] = false;
2033            }
2034            errCode = couchstore_docinfos_by_id(db,
2035                                                ids.data(),
2036                                                (unsigned)ids.size(),
2037                                                0,
2038                                                readDocInfos,
2039                                                &kvctx);
2040            if (errCode != COUCHSTORE_SUCCESS) {
2041                logger.log(EXTENSION_LOG_WARNING,
2042                           "CouchKVStore::saveDocs: couchstore_docinfos_by_id "
2043                           "error:%s [%s], vb:%" PRIu16 ", numdocs:%" PRIu64,
2044                           couchstore_strerror(errCode),
2045                           couchkvstore_strerrno(db, errCode).c_str(),
2046                           vbid,
2047                           uint64_t(docs.size()));
2048                return errCode;
2049            }
2050
2051            auto cs_begin = ProcessClock::now();
2052            uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
2053            errCode = couchstore_save_documents(db,
2054                                                docs.data(),
2055                                                docinfos.data(),
2056                                                (unsigned)docs.size(),
2057                                                flags);
2058            st.saveDocsHisto.add(
2059                    std::chrono::duration_cast<std::chrono::microseconds>(
2060                            ProcessClock::now() - cs_begin));
2061            if (errCode != COUCHSTORE_SUCCESS) {
2062                logger.log(EXTENSION_LOG_WARNING,
2063                           "CouchKVStore::saveDocs: couchstore_save_documents "
2064                           "error:%s [%s], vb:%" PRIu16 ", numdocs:%" PRIu64,
2065                           couchstore_strerror(errCode),
2066                           couchkvstore_strerrno(db, errCode).c_str(),
2067                           vbid,
2068                           uint64_t(docs.size()));
2069                return errCode;
2070            }
2071        }
2072
2073        errCode = saveVBState(db, *state);
2074        if (errCode != COUCHSTORE_SUCCESS) {
2075            logger.log(EXTENSION_LOG_WARNING,
2076                       "CouchKVStore::saveDocs: saveVBState error:%s [%s]",
2077                       couchstore_strerror(errCode),
2078                       couchkvstore_strerrno(db, errCode).c_str());
2079            return errCode;
2080        }
2081
2082        if (collectionsManifest) {
2083            saveCollectionsManifest(*db, *collectionsManifest);
2084        }
2085
2086        auto cs_begin = ProcessClock::now();
2087        errCode = couchstore_commit(db);
2088        st.commitHisto.add(
2089                std::chrono::duration_cast<std::chrono::microseconds>(
2090                        ProcessClock::now() - cs_begin));
2091        if (errCode) {
2092            logger.log(
2093                    EXTENSION_LOG_WARNING,
2094                    "CouchKVStore::saveDocs: couchstore_commit error:%s [%s]",
2095                    couchstore_strerror(errCode),
2096                    couchkvstore_strerrno(db, errCode).c_str());
2097            return errCode;
2098        }
2099
2100        st.batchSize.add(docs.size());
2101
2102        // retrieve storage system stats for file fragmentation computation
2103        errCode = couchstore_db_info(db, &info);
2104        if (errCode) {
2105            logger.log(
2106                    EXTENSION_LOG_WARNING,
2107                    "CouchKVStore::saveDocs: couchstore_db_info error:%s [%s]",
2108                    couchstore_strerror(errCode),
2109                    couchkvstore_strerrno(db, errCode).c_str());
2110            return errCode;
2111        }
2112        cachedSpaceUsed[vbid] = info.space_used;
2113        cachedFileSize[vbid] = info.file_size;
2114        cachedDeleteCount[vbid] = info.deleted_count;
2115        cachedDocCount[vbid] = info.doc_count;
2116
2117        // Check seqno if we wrote documents
2118        if (docs.size() > 0 && maxDBSeqno != info.last_sequence) {
2119            logger.log(EXTENSION_LOG_WARNING,
2120                       "CouchKVStore::saveDocs: Seqno in db header (%" PRIu64 ")"
2121                       " is not matched with what was persisted (%" PRIu64 ")"
2122                       " for vb:%" PRIu16,
2123                       info.last_sequence, maxDBSeqno, vbid);
2124        }
2125        state->highSeqno = info.last_sequence;
2126    }
2127
2128    /* update stat */
2129    if(errCode == COUCHSTORE_SUCCESS) {
2130        st.docsCommitted = docs.size();
2131    }
2132
2133    return errCode;
2134}
2135
2136void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
2137                                  kvstats_ctx &kvctx,
2138                                  couchstore_error_t errCode) {
2139    size_t commitSize = committedReqs.size();
2140
2141    for (size_t index = 0; index < commitSize; index++) {
2142        size_t dataSize = committedReqs[index]->getNBytes();
2143        size_t keySize = committedReqs[index]->getKey().size();
2144        /* update ep stats */
2145        ++st.io_num_write;
2146        st.io_write_bytes += (keySize + dataSize);
2147
2148        if (committedReqs[index]->isDelete()) {
2149            int rv = getMutationStatus(errCode);
2150            if (rv != -1) {
2151                const auto& key = committedReqs[index]->getKey();
2152                if (kvctx.keyStats[key]) {
2153                    rv = 1; // Deletion is for an existing item on DB file.
2154                } else {
2155                    rv = 0; // Deletion is for a non-existing item on DB file.
2156                }
2157            }
2158            if (errCode) {
2159                ++st.numDelFailure;
2160            } else {
2161                st.delTimeHisto.add(committedReqs[index]->getDelta());
2162            }
2163            committedReqs[index]->getDelCallback()->callback(*transactionCtx,
2164                                                             rv);
2165        } else {
2166            int rv = getMutationStatus(errCode);
2167            const auto& key = committedReqs[index]->getKey();
2168            bool insertion = !kvctx.keyStats[key];
2169            if (errCode) {
2170                ++st.numSetFailure;
2171            } else {
2172                st.writeTimeHisto.add(committedReqs[index]->getDelta());
2173                st.writeSizeHisto.add(dataSize + keySize);
2174            }
2175            mutation_result p(rv, insertion);
2176            committedReqs[index]->getSetCallback()->callback(*transactionCtx,
2177                                                             p);
2178        }
2179    }
2180}
2181
2182std::pair<CouchKVStore::ReadVBStateStatus, snapshot_info_t>
2183CouchKVStore::processVbstateSnapshot(uint16_t vb,
2184                                     vbucket_state_t state,
2185                                     int64_t version,
2186                                     const std::string& snapStart,
2187                                     const std::string& snapEnd,
2188                                     uint64_t highSeqno) {
2189    ReadVBStateStatus status = ReadVBStateStatus::Success;
2190
2191    // All upgrade paths we now expect start and end
2192    uint64_t start = 0, end = 0;
2193    if (!parseUint64(snapStart.c_str(), &start) ||
2194        !parseUint64(snapEnd.c_str(), &end)) {
2195        start = end = uint64_t(highSeqno);
2196    } else if (!(highSeqno >= start && highSeqno <= end)) {
2197        // very likely MB-34173, log this occurrence.
2198        // log the state, range and version
2199        logger.log(EXTENSION_LOG_WARNING,
2200                   "CouchKVStore::processVbstateSnapshot vb:%" PRIu16
2201                   " %s with invalid snapshot range. Found version:%" PRId64
2202                   ", "
2203                   "highSeqno:%" PRIu64 ", start:%" PRIu64 ", end:%" PRIu64,
2204                   vb,
2205                   VBucket::toString(state),
2206                   version,
2207                   highSeqno,
2208                   start,
2209                   end);
2210
2211        if (state == vbucket_state_active) {
2212            // Reset the snapshot range to match what the flusher would
2213            // normally set, that is start and end equal the high-seqno
2214            start = end = highSeqno;
2215        } else {
2216            // Flag that the VB is corrupt, it needs rebuilding
2217            status = ReadVBStateStatus::CorruptSnapshot;
2218            start = 0, end = 0;
2219        }
2220    }
2221
2222    return {status, snapshot_info_t{highSeqno, {start, end}}};
2223}
2224
2225CouchKVStore::ReadVBStateStatus CouchKVStore::readVBState(Db* db,
2226                                                          uint16_t vbId) {
2227    sized_buf id;
2228    LocalDoc *ldoc = NULL;
2229    ReadVBStateStatus status = ReadVBStateStatus::Success;
2230    vbucket_state_t state = vbucket_state_dead;
2231    uint64_t checkpointId = 0;
2232    uint64_t maxDeletedSeqno = 0;
2233    int64_t highSeqno = 0;
2234    std::string failovers;
2235    uint64_t purgeSeqno = 0;
2236    snapshot_info_t snapshot{0, {0, 0}};
2237    uint64_t maxCas = 0;
2238    int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised;
2239    bool mightContainXattrs = false;
2240
2241    DbInfo info;
2242    auto couchStoreStatus = couchstore_db_info(db, &info);
2243    if (couchStoreStatus != COUCHSTORE_SUCCESS) {
2244        logger.log(EXTENSION_LOG_WARNING,
2245                   "CouchKVStore::readVBState: couchstore_db_info error:%s"
2246                   ", vb:%" PRIu16,
2247                   couchstore_strerror(couchStoreStatus),
2248                   vbId);
2249        return ReadVBStateStatus::CouchstoreError;
2250    }
2251
2252    highSeqno = info.last_sequence;
2253    purgeSeqno = info.purge_seq;
2254
2255    id.buf = (char *)"_local/vbstate";
2256    id.size = sizeof("_local/vbstate") - 1;
2257    couchStoreStatus =
2258            couchstore_open_local_document(db, (void*)id.buf, id.size, &ldoc);
2259    if (couchStoreStatus == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
2260        logger.log(EXTENSION_LOG_NOTICE,
2261                   "CouchKVStore::readVBState: '_local/vbstate' not found "
2262                   "for vb:%d",
2263                   vbId);
2264    } else if (couchStoreStatus != COUCHSTORE_SUCCESS) {
2265        logger.log(EXTENSION_LOG_WARNING,
2266                   "CouchKVStore::readVBState: couchstore_open_local_document"
2267                   " error:%s, vb:%" PRIu16,
2268                   couchstore_strerror(couchStoreStatus),
2269                   vbId);
2270        return ReadVBStateStatus::CouchstoreError;
2271    }
2272
2273    if (couchStoreStatus == COUCHSTORE_SUCCESS) {
2274        const std::string statjson(ldoc->json.buf, ldoc->json.size);
2275        cJSON* jsonObj = cJSON_Parse(statjson.c_str());
2276        if (!jsonObj) {
2277            couchstore_free_local_document(ldoc);
2278            logger.log(EXTENSION_LOG_WARNING,
2279                       "CouchKVStore::readVBState: Failed to "
2280                       "parse the vbstat json doc for vb:%" PRIu16 ", json:%s",
2281                       vbId,
2282                       statjson.c_str());
2283            return ReadVBStateStatus::JsonInvalid;
2284        }
2285
2286        const auto version =
2287                getJSONObjInt64(cJSON_GetObjectItem(jsonObj, "version"));
2288        const std::string vb_state =
2289                getJSONObjString(cJSON_GetObjectItem(jsonObj, "state"));
2290        const std::string checkpoint_id =
2291                getJSONObjString(cJSON_GetObjectItem(jsonObj, "checkpoint_id"));
2292        const std::string max_deleted_seqno = getJSONObjString(
2293                cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
2294        const std::string snapStart =
2295                getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_start"));
2296        const std::string snapEnd =
2297                getJSONObjString(cJSON_GetObjectItem(jsonObj, "snap_end"));
2298        const std::string maxCasValue =
2299                getJSONObjString(cJSON_GetObjectItem(jsonObj, "max_cas"));
2300        const std::string hlcCasEpoch =
2301                getJSONObjString(cJSON_GetObjectItem(jsonObj, "hlc_epoch"));
2302        mightContainXattrs = getJSONObjBool(
2303                cJSON_GetObjectItem(jsonObj, "might_contain_xattrs"));
2304
2305        cJSON* failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
2306        if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0 ||
2307            max_deleted_seqno.compare("") == 0) {
2308            logger.log(EXTENSION_LOG_WARNING,
2309                       "CouchKVStore::readVBState: State"
2310                       " JSON doc for vb:%" PRIu16
2311                       " is in the wrong format:%s, "
2312                       "vb state:%s, checkpoint id:%s and max deleted seqno:%s",
2313                       vbId,
2314                       statjson.c_str(),
2315                       vb_state.c_str(),
2316                       checkpoint_id.c_str(),
2317                       max_deleted_seqno.c_str());
2318        } else {
2319            state = VBucket::fromString(vb_state.c_str());
2320            parseUint64(max_deleted_seqno.c_str(), &maxDeletedSeqno);
2321            parseUint64(checkpoint_id.c_str(), &checkpointId);
2322
2323            if (maxCasValue.compare("") != 0) {
2324                parseUint64(maxCasValue.c_str(), &maxCas);
2325
2326                // MB-17517: If the maxCas on disk was invalid then don't use it
2327                // - instead rebuild from the items we load from disk (i.e. as
2328                // per an upgrade from an earlier version).
2329                if (maxCas == static_cast<uint64_t>(-1)) {
2330                    logger.log(EXTENSION_LOG_WARNING,
2331                               "CouchKVStore::readVBState: Invalid max_cas "
2332                               "(0x%" PRIx64 ") read from '%s' for vb:%" PRIu16
2333                               ". Resetting max_cas to zero.",
2334                               maxCas,
2335                               id.buf,
2336                               vbId);
2337                    maxCas = 0;
2338                }
2339            }
2340
2341            if (!hlcCasEpoch.empty()) {
2342                parseInt64(hlcCasEpoch.c_str(), &hlcCasEpochSeqno);
2343            }
2344
2345            if (failover_json) {
2346                failovers = to_string(failover_json, false);
2347            }
2348
2349            std::tie(status, snapshot) =
2350                    processVbstateSnapshot(vbId,
2351                                           state,
2352                                           version,
2353                                           snapStart,
2354                                           snapEnd,
2355                                           uint64_t(highSeqno));
2356        }
2357
2358        cJSON_Delete(jsonObj);
2359        couchstore_free_local_document(ldoc);
2360    }
2361
2362    // If snapshot is not initialised do not create a vbucket state
2363    if (status == ReadVBStateStatus::Success) {
2364        cachedVBStates[vbId] =
2365                std::make_unique<vbucket_state>(state,
2366                                                checkpointId,
2367                                                maxDeletedSeqno,
2368                                                snapshot.start,
2369                                                purgeSeqno,
2370                                                snapshot.range.start,
2371                                                snapshot.range.end,
2372                                                maxCas,
2373                                                hlcCasEpochSeqno,
2374                                                mightContainXattrs,
2375                                                failovers);
2376    }
2377    return status;
2378}
2379
2380couchstore_error_t CouchKVStore::saveVBState(Db *db,
2381                                             const vbucket_state &vbState) {
2382    std::stringstream jsonState;
2383
2384    jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
2385              << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
2386              << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno << "\"";
2387    if (!vbState.failovers.empty()) {
2388        jsonState << ",\"failover_table\": " << vbState.failovers;
2389    }
2390    jsonState << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
2391              << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
2392              << ",\"max_cas\": \"" << vbState.maxCas << "\""
2393              << ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\""
2394              << ",\"version\": 2";
2395
2396    if (vbState.mightContainXattrs) {
2397        jsonState << ",\"might_contain_xattrs\": true";
2398    } else {
2399        jsonState << ",\"might_contain_xattrs\": false";
2400    }
2401
2402    jsonState << "}";
2403
2404    LocalDoc lDoc;
2405    lDoc.id.buf = (char *)"_local/vbstate";
2406    lDoc.id.size = sizeof("_local/vbstate") - 1;
2407    std::string state = jsonState.str();
2408    lDoc.json.buf = (char *)state.c_str();
2409    lDoc.json.size = state.size();
2410    lDoc.deleted = 0;
2411
2412    couchstore_error_t errCode = couchstore_save_local_document(db, &lDoc);
2413    if (errCode != COUCHSTORE_SUCCESS) {
2414        logger.log(EXTENSION_LOG_WARNING,
2415                   "CouchKVStore::saveVBState couchstore_save_local_document "
2416                   "error:%s [%s]", couchstore_strerror(errCode),
2417                   couchkvstore_strerrno(db, errCode).c_str());
2418    }
2419
2420    return errCode;
2421}
2422
2423couchstore_error_t CouchKVStore::saveCollectionsManifest(
2424        Db& db, const Item& collectionsManifest) {
2425    LocalDoc lDoc;
2426    lDoc.id.buf = const_cast<char*>(Collections::CouchstoreManifest);
2427    lDoc.id.size = Collections::CouchstoreManifestLen;
2428
2429    // Convert the Item into JSON
2430    std::string state =
2431            Collections::VB::Manifest::serialToJson(collectionsManifest);
2432
2433    lDoc.json.buf = const_cast<char*>(state.c_str());
2434    lDoc.json.size = state.size();
2435    lDoc.deleted = 0;
2436
2437    couchstore_error_t errCode = couchstore_save_local_document(&db, &lDoc);
2438
2439    if (errCode != COUCHSTORE_SUCCESS) {
2440        logger.log(EXTENSION_LOG_WARNING,
2441                   "CouchKVStore::saveCollectionsManifest "
2442                   "couchstore_save_local_document "
2443                   "error:%s [%s]",
2444                   couchstore_strerror(errCode),
2445                   couchkvstore_strerrno(&db, errCode).c_str());
2446    }
2447
2448    return errCode;
2449}
2450
2451std::string CouchKVStore::readCollectionsManifest(Db& db) {
2452    sized_buf id;
2453    id.buf = const_cast<char*>(Collections::CouchstoreManifest);
2454    id.size = sizeof(Collections::CouchstoreManifest) - 1;
2455
2456    LocalDocHolder lDoc;
2457    auto errCode = couchstore_open_local_document(
2458            &db, (void*)id.buf, id.size, lDoc.getLocalDocAddress());
2459    if (errCode != COUCHSTORE_SUCCESS) {
2460        if (errCode == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
2461            logger.log(EXTENSION_LOG_NOTICE,
2462                   "CouchKVStore::readCollectionsManifest: doc not found");
2463        } else {
2464            logger.log(EXTENSION_LOG_WARNING,
2465                   "CouchKVStore::readCollectionsManifest: "
2466                   "couchstore_open_local_document error:%s",
2467                   couchstore_strerror(errCode));
2468        }
2469
2470        return {};
2471    }
2472
2473    return {lDoc.getLocalDoc()->json.buf, lDoc.getLocalDoc()->json.size};
2474}
2475
2476int CouchKVStore::getMultiCb(Db *db, DocInfo *docinfo, void *ctx) {
2477    if (docinfo == nullptr) {
2478        throw std::invalid_argument("CouchKVStore::getMultiCb: docinfo "
2479                "must be non-NULL");
2480    }
2481    if (ctx == nullptr) {
2482        throw std::invalid_argument("CouchKVStore::getMultiCb: ctx must "
2483                "be non-NULL");
2484    }
2485
2486    GetMultiCbCtx *cbCtx = static_cast<GetMultiCbCtx *>(ctx);
2487    // Collections: TODO: Permanently restore to stored namespace
2488    DocKey key = makeDocKey(docinfo->id,
2489                            cbCtx->cks.getConfig().shouldPersistDocNamespace());
2490    KVStoreStats& st = cbCtx->cks.getKVStoreStat();
2491
2492    vb_bgfetch_queue_t::iterator qitr = cbCtx->fetches.find(key);
2493    if (qitr == cbCtx->fetches.end()) {
2494        // this could be a serious race condition in couchstore,
2495        // log a warning message and continue
2496        cbCtx->cks.logger.log(EXTENSION_LOG_WARNING,
2497                              "CouchKVStore::getMultiCb: Couchstore returned "
2498                              "invalid docinfo, no pending bgfetch has been "
2499                              "issued for a key in vb:%" PRIu16 ", "
2500                              "seqno:%" PRIu64, cbCtx->vbId, docinfo->rev_seq);
2501        return 0;
2502    }
2503
2504    vb_bgfetch_item_ctx_t& bg_itm_ctx = (*qitr).second;
2505    GetMetaOnly meta_only = bg_itm_ctx.isMetaOnly;
2506
2507    couchstore_error_t errCode = cbCtx->cks.fetchDoc(
2508            db, docinfo, bg_itm_ctx.value, cbCtx->vbId, meta_only);
2509    if (errCode != COUCHSTORE_SUCCESS && (meta_only == GetMetaOnly::No)) {
2510        st.numGetFailure++;
2511    }
2512
2513    bg_itm_ctx.value.setStatus(cbCtx->cks.couchErr2EngineErr(errCode));
2514
2515    bool return_val_ownership_transferred = false;
2516    for (auto& fetch : bg_itm_ctx.bgfetched_list) {
2517        return_val_ownership_transferred = true;
2518        // populate return value for remaining fetch items with the
2519        // same seqid
2520        fetch->value = &bg_itm_ctx.value;
2521        st.readTimeHisto.add(
2522                std::chrono::duration_cast<std::chrono::microseconds>(
2523                        ProcessClock::now() - fetch->initTime));
2524        if (errCode == COUCHSTORE_SUCCESS) {
2525            st.readSizeHisto.add(bg_itm_ctx.value.item->getKey().size() +
2526                                 bg_itm_ctx.value.item->getNBytes());
2527        }
2528    }
2529    if (!return_val_ownership_transferred) {
2530        cbCtx->cks.logger.log(EXTENSION_LOG_WARNING,
2531                              "CouchKVStore::getMultiCb called with zero"
2532                              "items in bgfetched_list, vb:%" PRIu16
2533                              ", seqno:%" PRIu64,
2534                              cbCtx->vbId, docinfo->rev_seq);
2535    }
2536
2537    return 0;
2538}
2539
2540
2541void CouchKVStore::closeDatabaseHandle(Db *db) {
2542    couchstore_error_t ret = couchstore_close_file(db);
2543    if (ret != COUCHSTORE_SUCCESS) {
2544        logger.log(EXTENSION_LOG_WARNING,
2545                   "CouchKVStore::closeDatabaseHandle: couchstore_close_file "
2546                   "error:%s [%s]", couchstore_strerror(ret),
2547                   couchkvstore_strerrno(db, ret).c_str());
2548    }
2549    ret = couchstore_free_db(db);
2550    if (ret != COUCHSTORE_SUCCESS) {
2551        logger.log(EXTENSION_LOG_WARNING,
2552                   "CouchKVStore::closeDatabaseHandle: couchstore_free_db "
2553                   "error:%s [%s]", couchstore_strerror(ret),
2554                   couchkvstore_strerrno(nullptr, ret).c_str());
2555    }
2556    st.numClose++;
2557}
2558
2559ENGINE_ERROR_CODE CouchKVStore::couchErr2EngineErr(couchstore_error_t errCode) {
2560    switch (errCode) {
2561    case COUCHSTORE_SUCCESS:
2562        return ENGINE_SUCCESS;
2563    case COUCHSTORE_ERROR_ALLOC_FAIL:
2564        return ENGINE_ENOMEM;
2565    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
2566        return ENGINE_KEY_ENOENT;
2567    case COUCHSTORE_ERROR_NO_SUCH_FILE:
2568    case COUCHSTORE_ERROR_NO_HEADER:
2569    default:
2570        // same as the general error return code of
2571        // EPBucket::getInternal
2572        return ENGINE_TMPFAIL;
2573    }
2574}
2575
2576size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
2577    size_t delCount = cachedDeleteCount[vbid];
2578    if (delCount != (size_t) -1) {
2579        return delCount;
2580    }
2581
2582    DbHolder db(*this);
2583    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
2584    if (errCode == COUCHSTORE_SUCCESS) {
2585        DbInfo info;
2586        errCode = couchstore_db_info(db, &info);
2587        if (errCode == COUCHSTORE_SUCCESS) {
2588            cachedDeleteCount[vbid] = info.deleted_count;
2589            return info.deleted_count;
2590        } else {
2591            throw std::runtime_error(
2592                    "CouchKVStore::getNumPersistedDeletes:"
2593                    "Failed to read database info for vBucket = " +
2594                    std::to_string(vbid) + " rev = " +
2595                    std::to_string(db.getFileRev()) + " with error:" +
2596                    couchstore_strerror(errCode));
2597        }
2598    } else {
2599        // open failed - map couchstore error code to exception.
2600        std::errc ec;
2601        switch (errCode) {
2602            case COUCHSTORE_ERROR_OPEN_FILE:
2603                ec = std::errc::no_such_file_or_directory; break;
2604            default:
2605                ec = std::errc::io_error; break;
2606        }
2607        throw std::system_error(std::make_error_code(ec),
2608                                "CouchKVStore::getNumPersistedDeletes:"
2609                                "Failed to open database file for vBucket = " +
2610                                        std::to_string(vbid) + " rev = " +
2611                                        std::to_string(db.getFileRev()) +
2612                                        " with error:" +
2613                                        couchstore_strerror(errCode));
2614    }
2615    return 0;
2616}
2617
2618DBFileInfo CouchKVStore::getDbFileInfo(uint16_t vbid) {
2619
2620    DbInfo info = getDbInfo(vbid);
2621    return DBFileInfo{info.file_size, info.space_used};
2622}
2623
2624DBFileInfo CouchKVStore::getAggrDbFileInfo() {
2625    DBFileInfo kvsFileInfo;
2626    /**
2627     * Iterate over all the vbuckets to get the total.
2628     * If the vbucket is dead, then its value would
2629     * be zero.
2630     */
2631    for (uint16_t vbid = 0; vbid < numDbFiles; vbid++) {
2632        kvsFileInfo.fileSize += cachedFileSize[vbid].load();
2633        kvsFileInfo.spaceUsed += cachedSpaceUsed[vbid].load();
2634    }
2635    return kvsFileInfo;
2636}
2637
2638size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
2639                                 uint64_t max_seq) {
2640    DbHolder db(*this);
2641    uint64_t count = 0;
2642    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
2643    if (errCode == COUCHSTORE_SUCCESS) {
2644        errCode = couchstore_changes_count(db, min_seq, max_seq, &count);
2645        if (errCode != COUCHSTORE_SUCCESS) {
2646            throw std::runtime_error(
2647                    "CouchKVStore::getNumItems: Failed to "
2648                    "get changes count for vBucket = " +
2649                    std::to_string(vbid) + " rev = " +
2650                    std::to_string(db.getFileRev()) + " with error:" +
2651                    couchstore_strerror(errCode));
2652        }
2653    } else {
2654        throw std::invalid_argument(
2655                "CouchKVStore::getNumItems: Failed to "
2656                "open database file for vBucket = " +
2657                std::to_string(vbid) + " rev = " +
2658                std::to_string(db.getFileRev()) + " with error:" +
2659                couchstore_strerror(errCode));
2660    }
2661    return count;
2662}
2663
2664size_t CouchKVStore::getItemCount(uint16_t vbid) {
2665    if (!isReadOnly()) {
2666        return cachedDocCount.at(vbid);
2667    }
2668    return getDbInfo(vbid).doc_count;
2669}
2670
2671RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
2672                                      std::shared_ptr<RollbackCB> cb) {
2673    DbHolder db(*this);
2674    DbInfo info;
2675    couchstore_error_t errCode;
2676
2677    // Open the vbucket's file and determine the latestSeqno persisted.
2678    errCode = openDB(vbid, db, (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY);
2679    std::stringstream dbFileName;
2680    dbFileName << dbname << "/" << vbid << ".couch." << db.getFileRev();
2681
2682    if (errCode == COUCHSTORE_SUCCESS) {
2683        errCode = couchstore_db_info(db, &info);
2684        if (errCode != COUCHSTORE_SUCCESS) {
2685            logger.log(EXTENSION_LOG_WARNING,
2686                       "CouchKVStore::rollback: couchstore_db_info error:%s, "
2687                       "name:%s", couchstore_strerror(errCode),
2688                       dbFileName.str().c_str());
2689            return RollbackResult(false, 0, 0, 0);
2690        }
2691    } else {
2692        logger.log(EXTENSION_LOG_WARNING,
2693                   "CouchKVStore::rollback: openDB error:%s, name:%s",
2694                   couchstore_strerror(errCode), dbFileName.str().c_str());
2695        return RollbackResult(false, 0, 0, 0);
2696    }
2697    uint64_t latestSeqno = info.last_sequence;
2698
2699    // Count how many updates are in the vbucket's file. We'll later compare
2700    // this with how many items must be discarded and hence decide if it is
2701    // better to discard everything and start from an empty vBucket.
2702    uint64_t totSeqCount = 0;
2703    errCode = couchstore_changes_count(db, 0, latestSeqno, &totSeqCount);
2704    if (errCode != COUCHSTORE_SUCCESS) {
2705        logger.log(EXTENSION_LOG_WARNING,
2706                   "CouchKVStore::rollback: "
2707                   "couchstore_changes_count(0, %" PRIu64
2708                   ") error:%s [%s], "
2709                   "vb:%" PRIu16 ", rev:%" PRIu64,
2710                   latestSeqno,
2711                   couchstore_strerror(errCode),
2712                   cb_strerror().c_str(),
2713                   vbid,
2714                   db.getFileRev());
2715        return RollbackResult(false, 0, 0, 0);
2716    }
2717
2718    // Open the vBucket file again; and search for a header which is
2719    // before the requested rollback point - the Rollback Header.
2720    DbHolder newdb(*this);
2721    errCode = openDB(vbid, newdb, 0);
2722    if (errCode != COUCHSTORE_SUCCESS) {
2723        logger.log(EXTENSION_LOG_WARNING,
2724                   "CouchKVStore::rollback: openDB#2 error:%s, name:%s",
2725                   couchstore_strerror(errCode), dbFileName.str().c_str());
2726        return RollbackResult(false, 0, 0, 0);
2727    }
2728
2729    while (info.last_sequence > rollbackSeqno) {
2730        errCode = couchstore_rewind_db_header(newdb);
2731        if (errCode != COUCHSTORE_SUCCESS) {
2732            // rewind_db_header cleans up (frees DB) on error; so
2733            // release db in DbHolder to prevent a double-free.
2734            newdb.releaseDb();
2735            logger.log(EXTENSION_LOG_WARNING,
2736                       "CouchKVStore::rollback: couchstore_rewind_db_header "
2737                       "error:%s [%s], vb:%" PRIu16 ", latestSeqno:%" PRIu64
2738                       ", rollbackSeqno:%" PRIu64,
2739                       couchstore_strerror(errCode), cb_strerror().c_str(),
2740                       vbid, latestSeqno, rollbackSeqno);
2741            //Reset the vbucket and send the entire snapshot,
2742            //as a previous header wasn't found.
2743            return RollbackResult(false, 0, 0, 0);
2744        }
2745        errCode = couchstore_db_info(newdb, &info);
2746        if (errCode != COUCHSTORE_SUCCESS) {
2747            logger.log(EXTENSION_LOG_WARNING,
2748                       "CouchKVStore::rollback: couchstore_db_info error:%s, "
2749                       "name:%s", couchstore_strerror(errCode),
2750                       dbFileName.str().c_str());
2751            return RollbackResult(false, 0, 0, 0);
2752        }
2753    }
2754
2755    // Count how many updates we need to discard to rollback to the Rollback
2756    // Header. If this is too many; then prefer to discard everything (than
2757    // have to patch up a large amount of in-memory data).
2758    uint64_t rollbackSeqCount = 0;
2759    errCode = couchstore_changes_count(
2760            db, info.last_sequence, latestSeqno, &rollbackSeqCount);
2761    if (errCode != COUCHSTORE_SUCCESS) {
2762        logger.log(EXTENSION_LOG_WARNING,
2763                   "CouchKVStore::rollback: "
2764                   "couchstore_changes_count#2(%" PRIu64 ", %" PRIu64
2765                   ") "
2766                   "error:%s [%s], vb:%" PRIu16 ", rev:%" PRIu64,
2767                   info.last_sequence,
2768                   latestSeqno,
2769                   couchstore_strerror(errCode),
2770                   cb_strerror().c_str(),
2771                   vbid,
2772                   db.getFileRev());
2773        return RollbackResult(false, 0, 0, 0);
2774    }
2775    if ((totSeqCount / 2) <= rollbackSeqCount) {
2776        //doresetVbucket flag set or rollback is greater than 50%,
2777        //reset the vbucket and send the entire snapshot
2778        return RollbackResult(false, 0, 0, 0);
2779    }
2780
2781    // We have decided to perform a rollback to the Rollback Header.
2782    // Iterate across the series of keys which have been updated /since/ the
2783    // Rollback Header; invoking a callback on each. This allows the caller to
2784    // then inspect the state of the given key in the Rollback Header, and
2785    // correct the in-memory view:
2786    // * If the key is not present in the Rollback header then delete it from
2787    //   the HashTable (if either didn't exist yet, or had previously been
2788    //   deleted in the Rollback header).
2789    // * If the key is present in the Rollback header then replace the in-memory
2790    // value with the value from the Rollback header.
2791    cb->setDbHeader(newdb);
2792    auto cl = std::make_shared<NoLookupCallback>();
2793    ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence+1,
2794                                       DocumentFilter::ALL_ITEMS,
2795                                       ValueFilter::KEYS_ONLY);
2796    scan_error_t error = scan(ctx);
2797    destroyScanContext(ctx);
2798
2799    if (error != scan_success) {
2800        return RollbackResult(false, 0, 0, 0);
2801    }
2802
2803    if (readVBState(newdb, vbid) != ReadVBStateStatus::Success) {
2804        return RollbackResult(false, 0, 0, 0);
2805    }
2806    cachedDeleteCount[vbid] = info.deleted_count;
2807    cachedDocCount[vbid] = info.doc_count;
2808
2809    //Append the rewinded header to the database file
2810    errCode = couchstore_commit(newdb);
2811
2812    if (errCode != COUCHSTORE_SUCCESS) {
2813        return RollbackResult(false, 0, 0, 0);
2814    }
2815
2816    vbucket_state* vb_state = getVBucketState(vbid);
2817    return RollbackResult(true, vb_state->highSeqno,
2818                          vb_state->lastSnapStart, vb_state->lastSnapEnd);
2819}
2820
2821int populateAllKeys(Db *db, DocInfo *docinfo, void *ctx) {
2822    AllKeysCtx *allKeysCtx = (AllKeysCtx *)ctx;
2823    // Collections: TODO: Restore to stored namespace
2824    DocKey key = makeDocKey(docinfo->id, false /*restore namespace*/);
2825    (allKeysCtx->cb)->callback(key);
2826    if (--(allKeysCtx->count) <= 0) {
2827        //Only when count met is less than the actual number of entries
2828        return COUCHSTORE_ERROR_CANCEL;
2829    }
2830    return COUCHSTORE_SUCCESS;
2831}
2832
2833ENGINE_ERROR_CODE
2834CouchKVStore::getAllKeys(uint16_t vbid,
2835                         const DocKey start_key,
2836                         uint32_t count,
2837                         std::shared_ptr<Callback<const DocKey&>> cb) {
2838    DbHolder db(*this);
2839    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
2840    if(errCode == COUCHSTORE_SUCCESS) {
2841        sized_buf ref = {NULL, 0};
2842        ref.buf = (char*) start_key.data();
2843        ref.size = start_key.size();
2844        AllKeysCtx ctx(cb, count);
2845        errCode = couchstore_all_docs(db,
2846                                      &ref,
2847                                      COUCHSTORE_NO_DELETES,
2848                                      populateAllKeys,
2849                                      static_cast<void*>(&ctx));
2850        if (errCode == COUCHSTORE_SUCCESS ||
2851                errCode == COUCHSTORE_ERROR_CANCEL)  {
2852            return ENGINE_SUCCESS;
2853        } else {
2854            logger.log(EXTENSION_LOG_WARNING,
2855                       "CouchKVStore::getAllKeys: couchstore_all_docs "
2856                       "error:%s [%s] vb:%" PRIu16 ", rev:%" PRIu64,
2857                       couchstore_strerror(errCode),
2858                       cb_strerror().c_str(),
2859                       vbid,
2860                       db.getFileRev());
2861        }
2862    } else {
2863        logger.log(EXTENSION_LOG_WARNING,
2864                   "CouchKVStore::getAllKeys: openDB error:%s, vb:%" PRIu16
2865                   ", rev:%" PRIu64,
2866                   couchstore_strerror(errCode),
2867                   vbid,
2868                   db.getFileRev());
2869    }
2870    return ENGINE_FAILED;
2871}
2872
2873void CouchKVStore::unlinkCouchFile(uint16_t vbucket,
2874                                   uint64_t fRev) {
2875
2876    if (isReadOnly()) {
2877        throw std::logic_error("CouchKVStore::unlinkCouchFile: Not valid on a "
2878                "read-only object.");
2879    }
2880    char fname[PATH_MAX];
2881    try {
2882        checked_snprintf(fname, sizeof(fname), "%s/%d.couch.%" PRIu64,
2883                         dbname.c_str(), vbucket, fRev);
2884    } catch (std::exception&) {
2885        LOG(EXTENSION_LOG_WARNING,
2886            "CouchKVStore::unlinkCouchFile: Failed to build filename:%s",
2887            fname);
2888        return;
2889    }
2890
2891    if (remove(fname) == -1) {
2892        logger.log(EXTENSION_LOG_WARNING,
2893                   "CouchKVStore::unlinkCouchFile: remove error:%u, "
2894                   "vb:%" PRIu16 ", rev:%" PRIu64 ", fname:%s", errno, vbucket,
2895                   fRev, fname);
2896
2897        if (errno != ENOENT) {
2898            std::string file_str = fname;
2899            pendingFileDeletions.push(file_str);
2900        }
2901    }
2902}
2903
2904void CouchKVStore::removeCompactFile(const std::string& dbname, uint16_t vbid) {
2905    std::string dbfile = getDBFileName(dbname, vbid, (*dbFileRevMap)[vbid]);
2906    std::string compact_file = dbfile + ".compact";
2907
2908    if (!isReadOnly()) {
2909        removeCompactFile(compact_file);
2910    } else {
2911        logger.log(EXTENSION_LOG_WARNING,
2912                   "CouchKVStore::removeCompactFile: A read-only instance of "
2913                   "the underlying store was not allowed to delete a temporary"
2914                   "file: %s",
2915                   compact_file.c_str());
2916    }
2917}
2918
2919void CouchKVStore::removeCompactFile(const std::string &filename) {
2920    if (isReadOnly()) {
2921        throw std::logic_error("CouchKVStore::removeCompactFile: Not valid on "
2922                "a read-only object.");
2923    }
2924
2925    if (access(filename.c_str(), F_OK) == 0) {
2926        if (remove(filename.c_str()) == 0) {
2927            logger.log(EXTENSION_LOG_WARNING,
2928                       "CouchKVStore::removeCompactFile: Removed compact "
2929                       "filename:%s", filename.c_str());
2930        }
2931        else {
2932            logger.log(EXTENSION_LOG_WARNING,
2933                       "CouchKVStore::removeCompactFile: remove error:%s, "
2934                       "filename:%s", cb_strerror().c_str(), filename.c_str());
2935
2936            if (errno != ENOENT) {
2937                pendingFileDeletions.push(const_cast<std::string &>(filename));
2938            }
2939        }
2940    }
2941}
2942
2943std::string CouchKVStore::getCollectionsManifest(uint16_t vbid) {
2944    DbHolder db(*this);
2945
2946    // openDB logs error details
2947    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
2948    if (errCode != COUCHSTORE_SUCCESS) {
2949        return {};
2950    }
2951
2952    return readCollectionsManifest(*db);
2953}
2954
2955void CouchKVStore::incrementRevision(uint16_t vbid) {
2956    (*dbFileRevMap)[vbid]++;
2957}
2958
2959uint64_t CouchKVStore::prepareToDelete(uint16_t vbid) {
2960    // Clear the stats so it looks empty (real deletion of the disk data occurs
2961    // later)
2962    cachedDocCount[vbid] = 0;
2963    cachedDeleteCount[vbid] = 0;
2964    cachedFileSize[vbid] = 0;
2965    cachedSpaceUsed[vbid] = 0;
2966    return (*dbFileRevMap)[vbid];
2967}
2968
2969/* end of couch-kvstore.cc */
2970