1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#ifdef _MSC_VER
21#define PATH_MAX MAX_PATH
22#endif
23
24#include <fcntl.h>
25#include <stdio.h>
26#include <string.h>
27#include <sys/stat.h>
28#include <sys/types.h>
29
30#include <algorithm>
31#include <cctype>
32#include <cstdlib>
33#include <fstream>
34#include <iostream>
35#include <list>
36#include <map>
37#include <phosphor/phosphor.h>
38#include <platform/cb_malloc.h>
39#include <platform/checked_snprintf.h>
40#include <string>
41#include <utility>
42#include <vector>
43#include <cJSON.h>
44#include <platform/dirutils.h>
45
46#include "common.h"
47#include "couch-kvstore/couch-kvstore.h"
48#include "ep_types.h"
49#include "kvstore_config.h"
50#include "statwriter.h"
51#include "vbucket.h"
52#include "vbucket_bgfetch_item.h"
53
54#include <JSON_checker.h>
55#include <kvstore.h>
56#include <platform/compress.h>
57
58extern "C" {
59    static int recordDbDumpC(Db *db, DocInfo *docinfo, void *ctx)
60    {
61        return CouchKVStore::recordDbDump(db, docinfo, ctx);
62    }
63}
64
65extern "C" {
66    static int getMultiCbC(Db *db, DocInfo *docinfo, void *ctx)
67    {
68        return CouchKVStore::getMultiCb(db, docinfo, ctx);
69    }
70}
71
72struct kvstats_ctx {
73    kvstats_ctx(bool persistDocNamespace)
74        : persistDocNamespace(persistDocNamespace) {
75    }
76    /// A map of key to bool. If true, the key exists in the VB datafile
77    std::unordered_map<StoredDocKey, bool> keyStats;
78    /// Collections: When enabled this means persisted keys have namespaces
79    bool persistDocNamespace;
80};
81
82static std::string getStrError(Db *db) {
83    const size_t max_msg_len = 256;
84    char msg[max_msg_len];
85    couchstore_last_os_error(db, msg, max_msg_len);
86    std::string errorStr(msg);
87    return errorStr;
88}
89
90/**
91 * Determine the datatype for a blob. It is _highly_ unlikely that
92 * this method is being called, as it would have to be for an item
93 * which is read off the disk _before_ we started to write the
94 * datatype to disk (we did that in a 3.x server).
95 *
96 * @param doc The document to check
97 * @return JSON or RAW bytes
98 */
99static protocol_binary_datatype_t determine_datatype(sized_buf doc) {
100    if (checkUTF8JSON(reinterpret_cast<uint8_t*>(doc.buf), doc.size)) {
101        return PROTOCOL_BINARY_DATATYPE_JSON;
102    } else {
103        return PROTOCOL_BINARY_RAW_BYTES;
104    }
105}
106
107static bool endWithCompact(const std::string &filename) {
108    size_t pos = filename.find(".compact");
109    if (pos == std::string::npos ||
110                        (filename.size() - sizeof(".compact")) != pos) {
111        return false;
112    }
113    return true;
114}
115
116static void discoverDbFiles(const std::string &dir,
117                            std::vector<std::string> &v) {
118    auto files = cb::io::findFilesContaining(dir, ".couch");
119    std::vector<std::string>::iterator ii;
120    for (ii = files.begin(); ii != files.end(); ++ii) {
121        if (!endWithCompact(*ii)) {
122            v.push_back(*ii);
123        }
124    }
125}
126
127static int getMutationStatus(couchstore_error_t errCode) {
128    switch (errCode) {
129    case COUCHSTORE_SUCCESS:
130        return MUTATION_SUCCESS;
131    case COUCHSTORE_ERROR_NO_HEADER:
132    case COUCHSTORE_ERROR_NO_SUCH_FILE:
133    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
134        // this return causes ep engine to drop the failed flush
135        // of an item since it does not know about the itme any longer
136        return DOC_NOT_FOUND;
137    default:
138        // this return causes ep engine to keep requeuing the failed
139        // flush of an item
140        return MUTATION_FAILED;
141    }
142}
143
144static bool allDigit(std::string &input) {
145    size_t numchar = input.length();
146    for(size_t i = 0; i < numchar; ++i) {
147        if (!isdigit(input[i])) {
148            return false;
149        }
150    }
151    return true;
152}
153
154static std::string couchkvstore_strerrno(Db *db, couchstore_error_t err) {
155    return (err == COUCHSTORE_ERROR_OPEN_FILE ||
156            err == COUCHSTORE_ERROR_READ ||
157            err == COUCHSTORE_ERROR_WRITE ||
158            err == COUCHSTORE_ERROR_FILE_CLOSE) ? getStrError(db) : "none";
159}
160
161static DocKey makeDocKey(const sized_buf buf, bool restoreNamespace) {
162    if (restoreNamespace) {
163        return DocKey(reinterpret_cast<const uint8_t*>(&buf.buf[1]),
164                      buf.size - 1,
165                      DocNamespace(buf.buf[0]));
166    } else {
167        return DocKey(reinterpret_cast<const uint8_t*>(buf.buf),
168                      buf.size,
169                      DocNamespace::DefaultCollection);
170    }
171}
172
173struct GetMultiCbCtx {
174    GetMultiCbCtx(CouchKVStore &c, uint16_t v, vb_bgfetch_queue_t &f) :
175        cks(c), vbId(v), fetches(f) {}
176
177    CouchKVStore &cks;
178    uint16_t vbId;
179    vb_bgfetch_queue_t &fetches;
180};
181
182struct StatResponseCtx {
183public:
184    StatResponseCtx(std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &sm,
185                    uint16_t vb) : statMap(sm), vbId(vb) {
186        /* EMPTY */
187    }
188
189    std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &statMap;
190    uint16_t vbId;
191};
192
193struct AllKeysCtx {
194    AllKeysCtx(std::shared_ptr<Callback<const DocKey&>> callback, uint32_t cnt)
195        : cb(callback), count(cnt) { }
196
197    std::shared_ptr<Callback<const DocKey&>> cb;
198    uint32_t count;
199};
200
201couchstore_content_meta_flags CouchRequest::getContentMeta(const Item& it) {
202    couchstore_content_meta_flags rval;
203
204    if (mcbp::datatype::is_json(it.getDataType())) {
205        rval = COUCH_DOC_IS_JSON;
206    } else {
207        rval = COUCH_DOC_NON_JSON_MODE;
208    }
209
210    if (it.getNBytes() > 0 && !mcbp::datatype::is_snappy(it.getDataType())) {
211        //Compress only if a value exists and is not already compressed
212        rval |= COUCH_DOC_IS_COMPRESSED;
213    }
214
215    return rval;
216}
217
218CouchRequest::CouchRequest(const Item& it,
219                           uint64_t rev,
220                           MutationRequestCallback& cb,
221                           bool del,
222                           bool persistDocNamespace)
223    : IORequest(it.getVBucketId(), cb, del, it.getKey()),
224      value(it.getValue()),
225      fileRevNum(rev) {
226    // Collections: TODO: Temporary switch to ensure upgrades don't break.
227    if (persistDocNamespace) {
228        dbDoc.id = {const_cast<char*>(reinterpret_cast<const char*>(
229                            key.getDocNameSpacedData())),
230                    it.getKey().getDocNameSpacedSize()};
231    } else {
232        dbDoc.id = {const_cast<char*>(key.c_str()), it.getKey().size()};
233    }
234
235    if (it.getNBytes()) {
236        dbDoc.data.buf = const_cast<char *>(value->getData());
237        dbDoc.data.size = it.getNBytes();
238    } else {
239        dbDoc.data.buf = NULL;
240        dbDoc.data.size = 0;
241    }
242    meta.setCas(it.getCas());
243    meta.setFlags(it.getFlags());
244    meta.setExptime(it.getExptime());
245    meta.setDataType(it.getDataType());
246
247    dbDocInfo.db_seq = it.getBySeqno();
248
249    // Now allocate space to hold the meta and get it ready for storage
250    dbDocInfo.rev_meta.size = MetaData::getMetaDataSize(MetaData::Version::V1);
251    dbDocInfo.rev_meta.buf = meta.prepareAndGetForPersistence();
252
253    dbDocInfo.rev_seq = it.getRevSeqno();
254    dbDocInfo.size = dbDoc.data.size;
255
256    if (del) {
257        dbDocInfo.deleted =  1;
258    } else {
259        dbDocInfo.deleted = 0;
260    }
261    dbDocInfo.id = dbDoc.id;
262    dbDocInfo.content_meta = getContentMeta(it);
263}
264
265CouchKVStore::CouchKVStore(KVStoreConfig& config)
266    : CouchKVStore(config, *couchstore_get_default_file_ops()) {
267}
268
269CouchKVStore::CouchKVStore(KVStoreConfig& config,
270                           FileOpsInterface& ops,
271                           bool readOnly,
272                           std::shared_ptr<RevisionMap> dbFileRevMap)
273    : KVStore(config, readOnly),
274      dbname(config.getDBName()),
275      dbFileRevMap(dbFileRevMap),
276      intransaction(false),
277      scanCounter(0),
278      logger(config.getLogger()),
279      base_ops(ops) {
280    createDataDir(dbname);
281    statCollectingFileOps = getCouchstoreStatsOps(st.fsStats, base_ops);
282    statCollectingFileOpsCompaction = getCouchstoreStatsOps(
283        st.fsStatsCompaction, base_ops);
284
285    // init db file map with default revision number, 1
286    numDbFiles = configuration.getMaxVBuckets();
287
288    // pre-allocate lookup maps (vectors) given we have a relatively
289    // small, fixed number of vBuckets.
290    cachedDocCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(0));
291    cachedDeleteCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(-1));
292    cachedFileSize.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(0));
293    cachedSpaceUsed.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(0));
294    cachedVBStates.resize(numDbFiles);
295
296    initialize();
297}
298
299CouchKVStore::CouchKVStore(KVStoreConfig& config, FileOpsInterface& ops)
300    : CouchKVStore(config,
301                   ops,
302                   false /*readonly*/,
303                   std::make_shared<RevisionMap>(config.getMaxVBuckets())) {
304}
305
306/**
307 * Make a read-only CouchKVStore from this object
308 */
309std::unique_ptr<CouchKVStore> CouchKVStore::makeReadOnlyStore() {
310    // Not using make_unique due to the private constructor we're calling
311    return std::unique_ptr<CouchKVStore>(
312            new CouchKVStore(configuration, dbFileRevMap));
313}
314
315CouchKVStore::CouchKVStore(KVStoreConfig& config,
316                           std::shared_ptr<RevisionMap> dbFileRevMap)
317    : CouchKVStore(config,
318                   *couchstore_get_default_file_ops(),
319                   true /*readonly*/,
320                   dbFileRevMap) {
321}
322
323void CouchKVStore::initialize() {
324    std::vector<uint16_t> vbids;
325    std::vector<std::string> files;
326    discoverDbFiles(dbname, files);
327    populateFileNameMap(files, &vbids);
328
329    couchstore_error_t errorCode;
330
331    std::vector<uint16_t>::iterator itr = vbids.begin();
332    for (; itr != vbids.end(); ++itr) {
333        uint16_t id = *itr;
334        DbHolder db(*this);
335        errorCode = openDB(id, db, COUCHSTORE_OPEN_FLAG_RDONLY);
336        if (errorCode == COUCHSTORE_SUCCESS) {
337            readVBState(db, id);
338            /* update stat */
339            ++st.numLoadedVb;
340        } else {
341            logger.log(EXTENSION_LOG_WARNING,
342                       "CouchKVStore::initialize: openDB"
343                       " error:%s, name:%s/%" PRIu16 ".couch.%" PRIu64,
344                       couchstore_strerror(errorCode),
345                       dbname.c_str(),
346                       id,
347                       db.getFileRev());
348            cachedVBStates[id] = NULL;
349        }
350
351        if (!isReadOnly()) {
352            removeCompactFile(dbname, id);
353        }
354    }
355}
356
357CouchKVStore::~CouchKVStore() {
358    close();
359}
360
361void CouchKVStore::reset(uint16_t vbucketId) {
362    if (isReadOnly()) {
363        throw std::logic_error("CouchKVStore::reset: Not valid on a read-only "
364                        "object.");
365    }
366
367    vbucket_state* state = getVBucketState(vbucketId);
368    if (state) {
369        state->reset();
370
371        cachedDocCount[vbucketId] = 0;
372        cachedDeleteCount[vbucketId] = 0;
373        cachedFileSize[vbucketId] = 0;
374        cachedSpaceUsed[vbucketId] = 0;
375
376        // Unlink the current revision and then increment it to ensure any
377        // pending delete doesn't delete us. Note that the expectation is that
378        // some higher level per VB lock is required to prevent data-races here.
379        // KVBucket::vb_mutexes is used in this case.
380        unlinkCouchFile(vbucketId, (*dbFileRevMap)[vbucketId]);
381        incrementRevision(vbucketId);
382
383        setVBucketState(
384                vbucketId, *state, VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT);
385    } else {
386        throw std::invalid_argument("CouchKVStore::reset: No entry in cached "
387                        "states for vbucket " + std::to_string(vbucketId));
388    }
389}
390
391void CouchKVStore::set(const Item& itm,
392                       Callback<TransactionContext, mutation_result>& cb) {
393    if (isReadOnly()) {
394        throw std::logic_error("CouchKVStore::set: Not valid on a read-only "
395                        "object.");
396    }
397    if (!intransaction) {
398        throw std::invalid_argument("CouchKVStore::set: intransaction must be "
399                        "true to perform a set operation.");
400    }
401
402    bool deleteItem = false;
403    MutationRequestCallback requestcb;
404    uint64_t fileRev = (*dbFileRevMap)[itm.getVBucketId()];
405
406    // each req will be de-allocated after commit
407    requestcb.setCb = &cb;
408    CouchRequest* req =
409            new CouchRequest(itm,
410                             fileRev,
411                             requestcb,
412                             deleteItem,
413                             configuration.shouldPersistDocNamespace());
414    pendingReqsQ.push_back(req);
415}
416
417GetValue CouchKVStore::get(const StoredDocKey& key,
418                           uint16_t vb,
419                           bool fetchDelete) {
420    DbHolder db(*this);
421    couchstore_error_t errCode = openDB(vb, db, COUCHSTORE_OPEN_FLAG_RDONLY);
422    if (errCode != COUCHSTORE_SUCCESS) {
423        ++st.numGetFailure;
424        logger.log(EXTENSION_LOG_WARNING,
425                   "CouchKVStore::get: openDB error:%s, vb:%" PRIu16,
426                   couchstore_strerror(errCode), vb);
427        return GetValue(nullptr, couchErr2EngineErr(errCode));
428    }
429
430    GetValue gv = getWithHeader(db, key, vb, GetMetaOnly::No, fetchDelete);
431    return gv;
432}
433
434GetValue CouchKVStore::getWithHeader(void* dbHandle,
435                                     const StoredDocKey& key,
436                                     uint16_t vb,
437                                     GetMetaOnly getMetaOnly,
438                                     bool fetchDelete) {
439    Db *db = (Db *)dbHandle;
440    auto start = ProcessClock::now();
441    DocInfo *docInfo = NULL;
442    sized_buf id;
443    GetValue rv;
444
445    if (configuration.shouldPersistDocNamespace()) {
446        id = {const_cast<char*>(reinterpret_cast<const char*>(
447                      key.getDocNameSpacedData())),
448              key.getDocNameSpacedSize()};
449
450    } else {
451        id = {const_cast<char*>(reinterpret_cast<const char*>(key.data())),
452              key.size()};
453    }
454
455    couchstore_error_t errCode = couchstore_docinfo_by_id(db, (uint8_t *)id.buf,
456                                                          id.size, &docInfo);
457    if (errCode != COUCHSTORE_SUCCESS) {
458        if (getMetaOnly == GetMetaOnly::No) {
459            // log error only if this is non-xdcr case
460            logger.log(EXTENSION_LOG_WARNING,
461                       "CouchKVStore::getWithHeader: couchstore_docinfo_by_id "
462                       "error:%s [%s], vb:%" PRIu16,
463                       couchstore_strerror(errCode),
464                       couchkvstore_strerrno(db, errCode).c_str(), vb);
465        }
466    } else {
467        if (docInfo == nullptr) {
468            throw std::logic_error("CouchKVStore::getWithHeader: "
469                    "couchstore_docinfo_by_id returned success but docInfo "
470                    "is NULL");
471        }
472        errCode = fetchDoc(db, docInfo, rv, vb, getMetaOnly);
473        if (errCode != COUCHSTORE_SUCCESS) {
474            logger.log(EXTENSION_LOG_WARNING,
475                       "CouchKVStore::getWithHeader: fetchDoc error:%s [%s],"
476                       " vb:%" PRIu16 ", deleted:%s",
477                       couchstore_strerror(errCode),
478                       couchkvstore_strerrno(db, errCode).c_str(), vb,
479                       docInfo->deleted ? "yes" : "no");
480        }
481
482        // record stats
483        st.readTimeHisto.add(
484                std::chrono::duration_cast<std::chrono::microseconds>(
485                        ProcessClock::now() - start));
486        if (errCode == COUCHSTORE_SUCCESS) {
487            st.readSizeHisto.add(key.size() + rv.item->getNBytes());
488        }
489    }
490
491    if(errCode != COUCHSTORE_SUCCESS) {
492        ++st.numGetFailure;
493    }
494
495    couchstore_free_docinfo(docInfo);
496    rv.setStatus(couchErr2EngineErr(errCode));
497    return rv;
498}
499
500void CouchKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t &itms) {
501    if (itms.empty()) {
502        return;
503    }
504    int numItems = itms.size();
505
506    DbHolder db(*this);
507    couchstore_error_t errCode = openDB(vb, db, COUCHSTORE_OPEN_FLAG_RDONLY);
508    if (errCode != COUCHSTORE_SUCCESS) {
509        logger.log(EXTENSION_LOG_WARNING,
510                   "CouchKVStore::getMulti: openDB error:%s, "
511                   "vb:%" PRIu16 ", numDocs:%d",
512                   couchstore_strerror(errCode), vb, numItems);
513        st.numGetFailure += numItems;
514        for (auto& item : itms) {
515            item.second.value.setStatus(ENGINE_NOT_MY_VBUCKET);
516        }
517        return;
518    }
519
520    size_t idx = 0;
521    std::vector<sized_buf> ids(itms.size());
522    for (auto& item : itms) {
523        if (configuration.shouldPersistDocNamespace()) {
524            ids[idx] = {const_cast<char*>(reinterpret_cast<const char*>(
525                                item.first.getDocNameSpacedData())),
526                        item.first.getDocNameSpacedSize()};
527        } else {
528            ids[idx] = {const_cast<char*>(reinterpret_cast<const char*>(
529                                item.first.data())),
530                        item.first.size()};
531        }
532
533        ++idx;
534    }
535
536    GetMultiCbCtx ctx(*this, vb, itms);
537
538    errCode = couchstore_docinfos_by_id(
539            db, ids.data(), itms.size(), 0, getMultiCbC, &ctx);
540    if (errCode != COUCHSTORE_SUCCESS) {
541        st.numGetFailure += numItems;
542        logger.log(EXTENSION_LOG_WARNING,
543                   "CouchKVStore::getMulti: "
544                   "couchstore_docinfos_by_id error %s [%s], vb:%" PRIu16,
545                   couchstore_strerror(errCode),
546                   couchkvstore_strerrno(db, errCode).c_str(),
547                   vb);
548        for (auto& item : itms) {
549            item.second.value.setStatus(couchErr2EngineErr(errCode));
550        }
551    }
552
553    // If available, record how many reads() we did for this getMulti;
554    // and the average reads per document.
555    auto* stats = couchstore_get_db_filestats(db);
556    if (stats != nullptr) {
557        const auto readCount = stats->getReadCount();
558        st.getMultiFsReadCount += readCount;
559        st.getMultiFsReadHisto.add(readCount);
560        st.getMultiFsReadPerDocHisto.add(readCount / itms.size());
561    }
562}
563
564void CouchKVStore::del(const Item& itm, Callback<TransactionContext, int>& cb) {
565    if (isReadOnly()) {
566        throw std::logic_error("CouchKVStore::del: Not valid on a read-only "
567                        "object.");
568    }
569    if (!intransaction) {
570        throw std::invalid_argument("CouchKVStore::del: intransaction must be "
571                        "true to perform a delete operation.");
572    }
573
574    uint64_t fileRev = (*dbFileRevMap)[itm.getVBucketId()];
575    MutationRequestCallback requestcb;
576    requestcb.delCb = &cb;
577    CouchRequest* req =
578            new CouchRequest(itm,
579                             fileRev,
580                             requestcb,
581                             true,
582                             configuration.shouldPersistDocNamespace());
583    pendingReqsQ.push_back(req);
584}
585
586void CouchKVStore::delVBucket(uint16_t vbucket, uint64_t fileRev) {
587    if (isReadOnly()) {
588        throw std::logic_error("CouchKVStore::delVBucket: Not valid on a "
589                        "read-only object.");
590    }
591
592    unlinkCouchFile(vbucket, fileRev);
593}
594
595std::vector<vbucket_state *> CouchKVStore::listPersistedVbuckets() {
596    std::vector<vbucket_state*> result;
597    for (const auto& vb : cachedVBStates) {
598        result.emplace_back(vb.get());
599    }
600    return result;
601}
602
603void CouchKVStore::getPersistedStats(std::map<std::string,
604                                     std::string> &stats) {
605    std::vector<char> buffer;
606    std::string fname = dbname + "/stats.json";
607    if (access(fname.c_str(), R_OK) == -1) {
608        return ;
609    }
610
611    std::ifstream session_stats;
612    session_stats.exceptions (session_stats.failbit | session_stats.badbit);
613    try {
614        session_stats.open(fname.c_str(), std::ios::binary);
615        session_stats.seekg(0, std::ios::end);
616        int flen = session_stats.tellg();
617        if (flen < 0) {
618            logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::getPersistedStats:"
619                       " Error in session stats ifstream!!!");
620            session_stats.close();
621            return;
622        }
623        session_stats.seekg(0, std::ios::beg);
624        buffer.resize(flen + 1);
625        session_stats.read(buffer.data(), flen);
626        session_stats.close();
627        buffer[flen] = '\0';
628
629        cJSON *json_obj = cJSON_Parse(buffer.data());
630        if (!json_obj) {
631            logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::getPersistedStats:"
632                       " Failed to parse the session stats json doc!!!");
633            return;
634        }
635
636        int json_arr_size = cJSON_GetArraySize(json_obj);
637        for (int i = 0; i < json_arr_size; ++i) {
638            cJSON *obj = cJSON_GetArrayItem(json_obj, i);
639            if (obj) {
640                stats[obj->string] = obj->valuestring ? obj->valuestring : "";
641            }
642        }
643        cJSON_Delete(json_obj);
644
645    } catch (const std::ifstream::failure &e) {
646        logger.log(EXTENSION_LOG_WARNING,
647                   "CouchKVStore::getPersistedStats: Failed to load the engine "
648                   "session stats due to IO exception \"%s\"", e.what());
649    } catch (...) {
650        logger.log(EXTENSION_LOG_WARNING,
651                   "CouchKVStore::getPersistedStats: Failed to load the engine "
652                   "session stats due to IO exception");
653    }
654}
655
656static std::string getDBFileName(const std::string &dbname,
657                                 uint16_t vbid,
658                                 uint64_t rev) {
659    return dbname + "/" + std::to_string(vbid) + ".couch." +
660           std::to_string(rev);
661}
662
663static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
664    // Examine the metadata of the doc
665    auto documentMetaData = MetaDataFactory::createMetaData((*info)->rev_meta);
666    // Allocate latest metadata
667    std::unique_ptr<MetaData> metadata;
668    if (documentMetaData->getVersionInitialisedFrom() == MetaData::Version::V0) {
669        // Metadata doesn't have flex_meta_code/datatype. Provision space for
670        // these paramenters.
671
672        // If the document is compressed we need to inflate it to
673        // determine if it is json or not.
674        cb::compression::Buffer inflated;
675        cb::const_char_buffer data {item->buf, item->size};
676        if (((*info)->content_meta | COUCH_DOC_IS_COMPRESSED) ==
677                (*info)->content_meta) {
678            if (!cb::compression::inflate(cb::compression::Algorithm::Snappy,
679                                          data, inflated)) {
680                throw std::runtime_error(
681                    "edit_docinfo_hook: failed to inflate document with seqno: " +
682                    std::to_string((*info)->db_seq) + " revno: " +
683                    std::to_string((*info)->rev_seq));
684            }
685            data = inflated;
686        }
687
688        protocol_binary_datatype_t datatype = PROTOCOL_BINARY_RAW_BYTES;
689        if (checkUTF8JSON(reinterpret_cast<const uint8_t*>(data.data()),
690                          data.size())) {
691            datatype = PROTOCOL_BINARY_DATATYPE_JSON;
692        }
693
694        // Now create a blank latest metadata.
695        metadata = MetaDataFactory::createMetaData();
696        // Copy the metadata this will pull across available V0 fields.
697        *metadata = *documentMetaData;
698
699        // Setup flex code and datatype
700        metadata->setFlexCode();
701        metadata->setDataType(datatype);
702    } else {
703        // The metadata in the document is V1 and needs no changes.
704        return 0;
705    }
706
707    // the docInfo pointer includes the DocInfo and the data it points to.
708    // this must be a pointer which cb_free() can deallocate
709    char* buffer = static_cast<char*>(cb_calloc(1, sizeof(DocInfo) +
710                             (*info)->id.size +
711                             MetaData::getMetaDataSize(MetaData::Version::V1)));
712
713
714    DocInfo* docInfo = reinterpret_cast<DocInfo*>(buffer);
715
716    // Deep-copy the incoming DocInfo, then we'll fix the pointers/buffer data
717    *docInfo = **info;
718
719    // Correct the id buffer
720    docInfo->id.buf = buffer + sizeof(DocInfo);
721    std::memcpy(docInfo->id.buf, (*info)->id.buf, docInfo->id.size);
722
723    // Correct the rev_meta pointer and fill it in.
724    docInfo->rev_meta.size = MetaData::getMetaDataSize(MetaData::Version::V1);
725    docInfo->rev_meta.buf = buffer + sizeof(DocInfo) + docInfo->id.size;
726    metadata->copyToBuf(docInfo->rev_meta);
727
728    // Free the orginal
729    couchstore_free_docinfo(*info);
730
731    // Return the newly allocated docinfo with corrected metadata
732    *info = docInfo;
733
734    return 1;
735}
736
737/**
738 * Notify the expiry callback that a document has expired
739 *
740 * @param info     document information for the expired item
741 * @param metadata metadata of the document
742 * @param item     buffer containing data and size
743 * @param ctx      context for compaction
744 * @param currtime current time
745 */
746static int notify_expired_item(DocInfo& info,
747                               MetaData& metadata,
748                               sized_buf item,
749                               compaction_ctx& ctx,
750                               time_t currtime) {
751    cb::char_buffer data;
752    cb::compression::Buffer inflated;
753
754    if (mcbp::datatype::is_xattr(metadata.getDataType())) {
755        if (item.buf == nullptr) {
756            // We need to pass on the entire document to the callback
757            return COUCHSTORE_COMPACT_NEED_BODY;
758        }
759
760        // A document on disk is marked snappy in two ways.
761        // 1) info.content_meta if the document was compressed by couchstore
762        // 2) datatype snappy if the document was already compressed when stored
763        if ((info.content_meta & COUCH_DOC_IS_COMPRESSED) ||
764            mcbp::datatype::is_snappy(metadata.getDataType())) {
765            using namespace cb::compression;
766
767            if (!inflate(Algorithm::Snappy, {item.buf, item.size}, inflated)) {
768                LOG(EXTENSION_LOG_WARNING,
769                    "time_purge_hook: failed to inflate document with seqno %" PRIu64 ""
770                    "revno: %" PRIu64, info.db_seq, info.rev_seq);
771                return COUCHSTORE_ERROR_CORRUPT;
772            }
773            // Now remove snappy bit
774            metadata.setDataType(metadata.getDataType() &
775                                 ~PROTOCOL_BINARY_DATATYPE_SNAPPY);
776            data = inflated;
777        }
778    }
779
780    // Collections: TODO: Restore to stored namespace
781    Item it(makeDocKey(info.id, ctx.config->shouldPersistDocNamespace()),
782            metadata.getFlags(),
783            metadata.getExptime(),
784            data.buf,
785            data.len,
786            metadata.getDataType(),
787            metadata.getCas(),
788            info.db_seq,
789            ctx.db_file_id,
790            info.rev_seq);
791
792    it.setRevSeqno(info.rev_seq);
793    ctx.expiryCallback->callback(it, currtime);
794
795    return COUCHSTORE_SUCCESS;
796}
797
798static int time_purge_hook(Db* d, DocInfo* info, sized_buf item, void* ctx_p) {
799    compaction_ctx* ctx = static_cast<compaction_ctx*>(ctx_p);
800    const uint16_t vbid = ctx->db_file_id;
801
802    if (info == nullptr) {
803        // Compaction finished
804        return couchstore_set_purge_seq(d, ctx->max_purged_seq[vbid]);
805    }
806
807    DbInfo infoDb;
808    auto err = couchstore_db_info(d, &infoDb);
809    if (err != COUCHSTORE_SUCCESS) {
810        LOG(EXTENSION_LOG_WARNING,
811            "time_purge_hook: couchstore_db_info() failed: %s",
812            couchstore_strerror(err));
813        return err;
814    }
815
816    uint64_t max_purge_seq = 0;
817    auto it = ctx->max_purged_seq.find(vbid);
818
819    if (it == ctx->max_purged_seq.end()) {
820        ctx->max_purged_seq[vbid] = 0;
821    } else {
822        max_purge_seq = it->second;
823    }
824
825    if (info->rev_meta.size >= MetaData::getMetaDataSize(MetaData::Version::V0)) {
826        auto metadata = MetaDataFactory::createMetaData(info->rev_meta);
827        uint32_t exptime = metadata->getExptime();
828
829        // Is the collections eraser installed?
830        if (ctx->collectionsEraser &&
831            ctx->collectionsEraser(
832                    makeDocKey(info->id,
833                               ctx->config->shouldPersistDocNamespace()),
834                    int64_t(info->db_seq),
835                    info->deleted,
836                    ctx->eraserContext)) {
837            ctx->stats.collectionsItemsPurged++;
838            return COUCHSTORE_COMPACT_DROP_ITEM;
839        }
840
841        if (info->deleted) {
842            if (info->db_seq != infoDb.last_sequence) {
843                if (ctx->drop_deletes) { // all deleted items must be dropped ...
844                    if (max_purge_seq < info->db_seq) {
845                        ctx->max_purged_seq[vbid] = info->db_seq; // track max_purged_seq
846                    }
847                    ctx->stats.tombstonesPurged++;
848                    return COUCHSTORE_COMPACT_DROP_ITEM;      // ...unconditionally
849                }
850                if (exptime < ctx->purge_before_ts &&
851                        (!ctx->purge_before_seq ||
852                         info->db_seq <= ctx->purge_before_seq)) {
853                    if (max_purge_seq < info->db_seq) {
854                        ctx->max_purged_seq[vbid] = info->db_seq;
855                    }
856                    ctx->stats.tombstonesPurged++;
857                    return COUCHSTORE_COMPACT_DROP_ITEM;
858                }
859            }
860        } else {
861            time_t currtime = ep_real_time();
862            if (exptime && exptime < currtime) {
863                int ret;
864                try {
865                    ret = notify_expired_item(*info, *metadata, item,
866                                             *ctx, currtime);
867                } catch (const std::bad_alloc&) {
868                    LOG(EXTENSION_LOG_WARNING,
869                        "time_purge_hook: memory allocation failed");
870                    return COUCHSTORE_ERROR_ALLOC_FAIL;
871                }
872
873                if (ret != COUCHSTORE_SUCCESS) {
874                    return ret;
875                }
876            }
877        }
878    }
879
880    if (ctx->bloomFilterCallback) {
881        bool deleted = info->deleted;
882        // Collections: TODO: Permanently restore to stored namespace
883        DocKey key = makeDocKey(
884                info->id, ctx->config->shouldPersistDocNamespace());
885
886        try {
887            ctx->bloomFilterCallback->callback(
888                    ctx->db_file_id, key, deleted);
889        } catch (std::runtime_error& re) {
890            LOG(EXTENSION_LOG_WARNING,
891                "time_purge_hook: exception occurred when invoking the "
892                "bloomfilter callback on vbucket:%" PRIu16
893                " - Details: %s", vbid, re.what());
894        }
895    }
896
897    return COUCHSTORE_COMPACT_KEEP_ITEM;
898}
899
900bool CouchKVStore::compactDB(compaction_ctx *hook_ctx) {
901    bool result = false;
902
903    try {
904        result = compactDBInternal(hook_ctx, edit_docinfo_hook);
905    } catch(std::logic_error& le) {
906        LOG(EXTENSION_LOG_WARNING,
907            "CouchKVStore::compactDB: exception while performing "
908            "compaction for vbucket:%" PRIu16
909            " - Details: %s", hook_ctx->db_file_id, le.what());
910    }
911    if (!result) {
912        ++st.numCompactionFailure;
913    }
914    return result;
915}
916
917FileInfo CouchKVStore::toFileInfo(const DbInfo& info) {
918    return FileInfo{
919            info.doc_count, info.deleted_count, info.file_size, info.purge_seq};
920}
921
922bool CouchKVStore::compactDBInternal(compaction_ctx* hook_ctx,
923                                     couchstore_docinfo_hook docinfo_hook) {
924    if (isReadOnly()) {
925        throw std::logic_error("CouchKVStore::compactDB: Cannot perform "
926                        "on a read-only instance.");
927    }
928    couchstore_compact_hook       hook = time_purge_hook;
929    couchstore_docinfo_hook dhook = docinfo_hook;
930    FileOpsInterface         *def_iops = statCollectingFileOpsCompaction.get();
931    DbHolder compactdb(*this);
932    DbHolder targetDb(*this);
933    couchstore_error_t         errCode = COUCHSTORE_SUCCESS;
934    ProcessClock::time_point     start = ProcessClock::now();
935    std::string                 dbfile;
936    std::string           compact_file;
937    std::string               new_file;
938    DbInfo                        info;
939    uint16_t                      vbid = hook_ctx->db_file_id;
940    hook_ctx->config = &configuration;
941
942    TRACE_EVENT1("CouchKVStore", "compactDB", "vbid", vbid);
943
944    // Open the source VBucket database file ...
945    errCode = openDB(
946            vbid, compactdb, (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, def_iops);
947    if (errCode != COUCHSTORE_SUCCESS) {
948        logger.log(EXTENSION_LOG_WARNING,
949                   "CouchKVStore::compactDB openDB error:%s, vb:%" PRIu16
950                   ", fileRev:%" PRIu64,
951                   couchstore_strerror(errCode),
952                   vbid,
953                   compactdb.getFileRev());
954        return false;
955    }
956
957    uint64_t new_rev = compactdb.getFileRev() + 1;
958
959    // Build the temporary vbucket.compact file name
960    dbfile = getDBFileName(dbname, vbid, compactdb.getFileRev());
961    compact_file = dbfile + ".compact";
962
963    couchstore_open_flags flags(COUCHSTORE_COMPACT_FLAG_UPGRADE_DB);
964
965    couchstore_db_info(compactdb, &info);
966    hook_ctx->stats.pre = toFileInfo(info);
967
968    /**
969     * This flag disables IO buffering in couchstore which means
970     * file operations will trigger syscalls immediately. This has
971     * a detrimental impact on performance and is only intended
972     * for testing.
973     */
974    if(!configuration.getBuffered()) {
975        flags |= COUCHSTORE_OPEN_FLAG_UNBUFFERED;
976    }
977
978    // Should automatic fsync() be configured for compaction?
979    const auto periodicSyncBytes = configuration.getPeriodicSyncBytes();
980    if (periodicSyncBytes != 0) {
981        flags |= couchstore_encode_periodic_sync_flags(periodicSyncBytes);
982    }
983
984    // Perform COMPACTION of vbucket.couch.rev into vbucket.couch.rev.compact
985    errCode = couchstore_compact_db_ex(compactdb,
986                                       compact_file.c_str(),
987                                       flags,
988                                       hook,
989                                       dhook,
990                                       hook_ctx,
991                                       def_iops);
992    if (errCode != COUCHSTORE_SUCCESS) {
993        logger.log(EXTENSION_LOG_WARNING,
994                   "CouchKVStore::compactDB:couchstore_compact_db_ex "
995                   "error:%s [%s], name:%s",
996                   couchstore_strerror(errCode),
997                   couchkvstore_strerrno(compactdb, errCode).c_str(),
998                   dbfile.c_str());
999        return false;
1000    }
1001
1002    // Close the source Database File once compaction is done
1003    compactdb.close();
1004
1005    // Rename the .compact file to one with the next revision number
1006    new_file = getDBFileName(dbname, vbid, new_rev);
1007    if (rename(compact_file.c_str(), new_file.c_str()) != 0) {
1008        logger.log(EXTENSION_LOG_WARNING,
1009                   "CouchKVStore::compactDB: rename error:%s, old:%s, new:%s",
1010                   cb_strerror().c_str(), compact_file.c_str(), new_file.c_str());
1011
1012        removeCompactFile(compact_file);
1013        return false;
1014    }
1015
1016    // Open the newly compacted VBucket database file ...
1017    errCode = openSpecificDB(
1018            vbid, new_rev, targetDb, (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY);
1019    if (errCode != COUCHSTORE_SUCCESS) {
1020        logger.log(EXTENSION_LOG_WARNING,
1021                   "CouchKVStore::compactDB: openDB#2 error:%s, file:%s, "
1022                   "fileRev:%" PRIu64,
1023                   couchstore_strerror(errCode),
1024                   new_file.c_str(),
1025                   targetDb.getFileRev());
1026        if (remove(new_file.c_str()) != 0) {
1027            logger.log(EXTENSION_LOG_WARNING,
1028                       "CouchKVStore::compactDB: remove error:%s, path:%s",
1029                       cb_strerror().c_str(), new_file.c_str());
1030        }
1031        return false;
1032    }
1033
1034    // Update the global VBucket file map so all operations use the new file
1035    updateDbFileMap(vbid, new_rev);
1036
1037    logger.log(EXTENSION_LOG_INFO,
1038               "INFO: created new couch db file, name:%s rev:%" PRIu64,
1039               new_file.c_str(), new_rev);
1040
1041    couchstore_db_info(targetDb.getDb(), &info);
1042    hook_ctx->stats.post = toFileInfo(info);
1043
1044    cachedFileSize[vbid] = info.file_size;
1045    cachedSpaceUsed[vbid] = info.space_used;
1046
1047    // also update cached state with dbinfo
1048    vbucket_state* state = getVBucketState(vbid);
1049    if (state) {
1050        state->highSeqno = info.last_sequence;
1051        state->purgeSeqno = info.purge_seq;
1052        cachedDeleteCount[vbid] = info.deleted_count;
1053        cachedDocCount[vbid] = info.doc_count;
1054    }
1055
1056    // Removing the stale couch file
1057    unlinkCouchFile(vbid, compactdb.getFileRev());
1058
1059    st.compactHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1060            ProcessClock::now() - start));
1061
1062    return true;
1063}
1064
1065vbucket_state * CouchKVStore::getVBucketState(uint16_t vbucketId) {
1066    return cachedVBStates[vbucketId].get();
1067}
1068
1069bool CouchKVStore::setVBucketState(uint16_t vbucketId,
1070                                   const vbucket_state& vbstate,
1071                                   VBStatePersist options) {
1072    std::map<uint16_t, uint64_t>::iterator mapItr;
1073    couchstore_error_t errorCode;
1074
1075    if (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT ||
1076            options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT) {
1077        DbHolder db(*this);
1078        errorCode =
1079                openDB(vbucketId, db, (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE);
1080        if (errorCode != COUCHSTORE_SUCCESS) {
1081            ++st.numVbSetFailure;
1082            logger.log(EXTENSION_LOG_WARNING,
1083                       "CouchKVStore::setVBucketState: openDB error:%s, "
1084                       "vb:%" PRIu16 ", fileRev:%" PRIu64,
1085                       couchstore_strerror(errorCode),
1086                       vbucketId,
1087                       db.getFileRev());
1088            return false;
1089        }
1090
1091        errorCode = saveVBState(db, vbstate);
1092        if (errorCode != COUCHSTORE_SUCCESS) {
1093            ++st.numVbSetFailure;
1094            logger.log(EXTENSION_LOG_WARNING,
1095                       "CouchKVStore:setVBucketState: saveVBState error:%s, "
1096                       "vb:%" PRIu16 ", fileRev:%" PRIu64,
1097                       couchstore_strerror(errorCode),
1098                       vbucketId,
1099                       db.getFileRev());
1100            return false;
1101        }
1102
1103        if (options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT) {
1104            errorCode = couchstore_commit(db);
1105            if (errorCode != COUCHSTORE_SUCCESS) {
1106                ++st.numVbSetFailure;
1107                logger.log(EXTENSION_LOG_WARNING,
1108                           "CouchKVStore:setVBucketState: couchstore_commit "
1109                           "error:%s [%s], vb:%" PRIu16 ", rev:%" PRIu64,
1110                           couchstore_strerror(errorCode),
1111                           couchkvstore_strerrno(db, errorCode).c_str(),
1112                           vbucketId,
1113                           db.getFileRev());
1114                return false;
1115            }
1116        }
1117
1118        DbInfo info;
1119        errorCode = couchstore_db_info(db, &info);
1120        if (errorCode != COUCHSTORE_SUCCESS) {
1121            logger.log(EXTENSION_LOG_WARNING,
1122                       "CouchKVStore::setVBucketState: couchstore_db_info "
1123                       "error:%s, vb:%" PRIu16, couchstore_strerror(errorCode),
1124                       vbucketId);
1125        } else {
1126            cachedSpaceUsed[vbucketId] = info.space_used;
1127            cachedFileSize[vbucketId] = info.file_size;
1128        }
1129    } else {
1130        throw std::invalid_argument("CouchKVStore::setVBucketState: invalid vb state "
1131                        "persist option specified for vbucket id:" +
1132                        std::to_string(vbucketId));
1133    }
1134
1135    return true;
1136}
1137
1138bool CouchKVStore::snapshotVBucket(uint16_t vbucketId,
1139                                   const vbucket_state &vbstate,
1140                                   VBStatePersist options) {
1141    if (isReadOnly()) {
1142        logger.log(EXTENSION_LOG_WARNING,
1143                   "CouchKVStore::snapshotVBucket: cannot be performed on a "
1144                   "read-only KVStore instance");
1145        return false;
1146    }
1147
1148    auto start = ProcessClock::now();
1149
1150    if (updateCachedVBState(vbucketId, vbstate) &&
1151         (options == VBStatePersist::VBSTATE_PERSIST_WITHOUT_COMMIT ||
1152          options == VBStatePersist::VBSTATE_PERSIST_WITH_COMMIT)) {
1153        vbucket_state* vbs = getVBucketState(vbucketId);
1154        if (!setVBucketState(vbucketId, *vbs, options)) {
1155            logger.log(EXTENSION_LOG_WARNING,
1156                       "CouchKVStore::snapshotVBucket: setVBucketState failed "
1157                       "state:%s, vb:%" PRIu16,
1158                       VBucket::toString(vbstate.state), vbucketId);
1159            return false;
1160        }
1161    }
1162
1163    LOG(EXTENSION_LOG_DEBUG,
1164        "CouchKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16 " state:%s",
1165        vbucketId,
1166        vbstate.toJSON().c_str());
1167
1168    st.snapshotHisto.add(std::chrono::duration_cast<std::chrono::microseconds>(
1169            ProcessClock::now() - start));
1170
1171    return true;
1172}
1173
1174StorageProperties CouchKVStore::getStorageProperties() {
1175    StorageProperties rv(StorageProperties::EfficientVBDump::Yes,
1176                         StorageProperties::EfficientVBDeletion::Yes,
1177                         StorageProperties::PersistedDeletion::Yes,
1178                         StorageProperties::EfficientGet::Yes,
1179                         StorageProperties::ConcurrentWriteCompact::No);
1180    return rv;
1181}
1182
1183bool CouchKVStore::commit(const Item* collectionsManifest) {
1184    if (isReadOnly()) {
1185        throw std::logic_error("CouchKVStore::commit: Not valid on a read-only "
1186                        "object.");
1187    }
1188
1189    if (intransaction) {
1190        if (commit2couchstore(collectionsManifest)) {
1191            intransaction = false;
1192            transactionCtx.reset();
1193        }
1194    }
1195
1196    return !intransaction;
1197}
1198
1199bool CouchKVStore::getStat(const char* name, size_t& value)  {
1200    if (strcmp("failure_compaction", name) == 0) {
1201        value = st.numCompactionFailure.load();
1202        return true;
1203    } else if (strcmp("failure_get", name) == 0) {
1204        value = st.numGetFailure.load();
1205        return true;
1206    } else if (strcmp("io_total_read_bytes", name) == 0) {
1207        value = st.fsStats.totalBytesRead.load() +
1208                st.fsStatsCompaction.totalBytesRead.load();
1209        return true;
1210    } else if (strcmp("io_total_write_bytes", name) == 0) {
1211        value = st.fsStats.totalBytesWritten.load() +
1212                st.fsStatsCompaction.totalBytesWritten.load();
1213        return true;
1214    } else if (strcmp("io_compaction_read_bytes", name) == 0) {
1215        value = st.fsStatsCompaction.totalBytesRead;
1216        return true;
1217    } else if (strcmp("io_compaction_write_bytes", name) == 0) {
1218        value = st.fsStatsCompaction.totalBytesWritten;
1219        return true;
1220    } else if (strcmp("io_bg_fetch_read_count", name) == 0) {
1221        value = st.getMultiFsReadCount;
1222        return true;
1223    }
1224
1225    return false;
1226}
1227
1228void CouchKVStore::pendingTasks() {
1229    if (isReadOnly()) {
1230        throw std::logic_error("CouchKVStore::pendingTasks: Not valid on a "
1231                        "read-only object.");
1232    }
1233
1234    if (!pendingFileDeletions.empty()) {
1235        std::queue<std::string> queue;
1236        pendingFileDeletions.getAll(queue);
1237
1238        while (!queue.empty()) {
1239            std::string filename_str = queue.front();
1240            if (remove(filename_str.c_str()) == -1) {
1241                logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::pendingTasks: "
1242                           "remove error:%d, file%s", errno,
1243                           filename_str.c_str());
1244                if (errno != ENOENT) {
1245                    pendingFileDeletions.push(filename_str);
1246                }
1247            }
1248            queue.pop();
1249        }
1250    }
1251}
1252
1253ScanContext* CouchKVStore::initScanContext(
1254        std::shared_ptr<StatusCallback<GetValue>> cb,
1255        std::shared_ptr<StatusCallback<CacheLookup>> cl,
1256        uint16_t vbid,
1257        uint64_t startSeqno,
1258        DocumentFilter options,
1259        ValueFilter valOptions) {
1260    DbHolder db(*this);
1261    couchstore_error_t errorCode =
1262            openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
1263    if (errorCode != COUCHSTORE_SUCCESS) {
1264        logger.log(EXTENSION_LOG_WARNING,
1265                   "CouchKVStore::initScanContext: openDB error:%s, "
1266                   "name:%s/%" PRIu16 ".couch.%" PRIu64,
1267                   couchstore_strerror(errorCode),
1268                   dbname.c_str(),
1269                   vbid,
1270                   db.getFileRev());
1271        return NULL;
1272    }
1273
1274    DbInfo info;
1275    errorCode = couchstore_db_info(db, &info);
1276    if (errorCode != COUCHSTORE_SUCCESS) {
1277        logger.log(EXTENSION_LOG_WARNING,
1278                   "CouchKVStore::initScanContext: couchstore_db_info error:%s",
1279                   couchstore_strerror(errorCode));
1280        LOG(EXTENSION_LOG_WARNING,
1281            "CouchKVStore::initScanContext: Failed to read DB info for "
1282            "backfill. vb:%" PRIu16 " rev:%" PRIu64 " error: %s",
1283            vbid,
1284            db.getFileRev(),
1285            couchstore_strerror(errorCode));
1286        return NULL;
1287    }
1288
1289    uint64_t count = 0;
1290    errorCode = couchstore_changes_count(
1291            db, startSeqno, std::numeric_limits<uint64_t>::max(), &count);
1292    if (errorCode != COUCHSTORE_SUCCESS) {
1293        LOG(EXTENSION_LOG_WARNING,
1294            "CouchKVStore::initScanContext:Failed to obtain changes "
1295            "count for vb:%" PRIu16 " rev:%" PRIu64 " start_seqno:%" PRIu64
1296            " error: %s",
1297            vbid,
1298            db.getFileRev(),
1299            startSeqno,
1300            couchstore_strerror(errorCode));
1301        return NULL;
1302    }
1303
1304    size_t scanId = scanCounter++;
1305
1306    {
1307        LockHolder lh(scanLock);
1308        scans[scanId] = db.releaseDb();
1309    }
1310
1311    ScanContext* sctx = new ScanContext(cb,
1312                                        cl,
1313                                        vbid,
1314                                        scanId,
1315                                        startSeqno,
1316                                        info.last_sequence,
1317                                        info.purge_seq,
1318                                        options,
1319                                        valOptions,
1320                                        count,
1321                                        configuration);
1322    sctx->logger = &logger;
1323    return sctx;
1324}
1325
1326static couchstore_docinfos_options getDocFilter(const DocumentFilter& filter) {
1327    switch (filter) {
1328    case DocumentFilter::ALL_ITEMS:
1329        return COUCHSTORE_NO_OPTIONS;
1330    case DocumentFilter::NO_DELETES:
1331        return COUCHSTORE_NO_DELETES;
1332    }
1333
1334    std::string err("getDocFilter: Illegal document filter!" +
1335                    std::to_string(static_cast<int>(filter)));
1336    throw std::runtime_error(err);
1337}
1338
1339scan_error_t CouchKVStore::scan(ScanContext* ctx) {
1340    if (!ctx) {
1341        return scan_failed;
1342    }
1343
1344    if (ctx->lastReadSeqno == ctx->maxSeqno) {
1345        return scan_success;
1346    }
1347
1348    TRACE_EVENT_START2("CouchKVStore",
1349                       "scan",
1350                       "vbid",
1351                       ctx->vbid,
1352                       "startSeqno",
1353                       ctx->startSeqno);
1354
1355    Db* db;
1356    {
1357        LockHolder lh(scanLock);
1358        auto itr = scans.find(ctx->scanId);
1359        if (itr == scans.end()) {
1360            return scan_failed;
1361        }
1362
1363        db = itr->second;
1364    }
1365
1366    uint64_t start = ctx->startSeqno;
1367    if (ctx->lastReadSeqno != 0) {
1368        start = ctx->lastReadSeqno + 1;
1369    }
1370
1371    couchstore_error_t errorCode;
1372    errorCode = couchstore_changes_since(db,
1373                                         start,
1374                                         getDocFilter(ctx->docFilter),
1375                                         recordDbDumpC,
1376                                         static_cast<void*>(ctx));
1377
1378    TRACE_EVENT_END1(
1379            "CouchKVStore", "scan", "lastReadSeqno", ctx->lastReadSeqno);
1380
1381    if (errorCode != COUCHSTORE_SUCCESS) {
1382        if (errorCode == COUCHSTORE_ERROR_CANCEL) {
1383            return scan_again;
1384        } else {
1385            logger.log(EXTENSION_LOG_WARNING,
1386                       "CouchKVStore::scan couchstore_changes_since "
1387                       "error:%s [%s]", couchstore_strerror(errorCode),
1388                       couchkvstore_strerrno(db, errorCode).c_str());
1389            return scan_failed;
1390        }
1391    }
1392    return scan_success;
1393}
1394
1395void CouchKVStore::destroyScanContext(ScanContext* ctx) {
1396    if (!ctx) {
1397        return;
1398    }
1399
1400    LockHolder lh(scanLock);
1401    auto itr = scans.find(ctx->scanId);
1402    if (itr != scans.end()) {
1403        closeDatabaseHandle(itr->second);
1404        scans.erase(itr);
1405    }
1406    delete ctx;
1407}
1408
1409DbInfo CouchKVStore::getDbInfo(uint16_t vbid) {
1410    DbHolder db(*this);
1411    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
1412    if (errCode == COUCHSTORE_SUCCESS) {
1413        DbInfo info;
1414        errCode = couchstore_db_info(db, &info);
1415        if (errCode == COUCHSTORE_SUCCESS) {
1416            return info;
1417        } else {
1418            throw std::runtime_error(
1419                    "CouchKVStore::getDbInfo: failed "
1420                    "to read database info for vBucket " +
1421                    std::to_string(vbid) + " revision " +
1422                    std::to_string(db.getFileRev()) +
1423                    " - couchstore returned error: " +
1424                    couchstore_strerror(errCode));
1425        }
1426    } else {
1427        // open failed - map couchstore error code to exception.
1428        std::errc ec;
1429        switch (errCode) {
1430            case COUCHSTORE_ERROR_OPEN_FILE:
1431                ec = std::errc::no_such_file_or_directory; break;
1432            default:
1433                ec = std::errc::io_error; break;
1434        }
1435        throw std::system_error(
1436                std::make_error_code(ec),
1437                "CouchKVStore::getDbInfo: failed to open database file for "
1438                "vBucket = " +
1439                        std::to_string(vbid) + " rev = " +
1440                        std::to_string(db.getFileRev()) + " with error:" +
1441                        couchstore_strerror(errCode));
1442    }
1443}
1444
1445void CouchKVStore::close() {
1446    intransaction = false;
1447}
1448
1449uint64_t CouchKVStore::checkNewRevNum(std::string &dbFileName, bool newFile) {
1450    uint64_t newrev = 0;
1451    std::string nameKey;
1452
1453    if (!newFile) {
1454        // extract out the file revision number first
1455        size_t secondDot = dbFileName.rfind(".");
1456        nameKey = dbFileName.substr(0, secondDot);
1457    } else {
1458        nameKey = dbFileName;
1459    }
1460    nameKey.append(".");
1461    const auto files = cb::io::findFilesWithPrefix(nameKey);
1462    std::vector<std::string>::const_iterator itor;
1463    // found file(s) whoes name has the same key name pair with different
1464    // revision number
1465    for (itor = files.begin(); itor != files.end(); ++itor) {
1466        const std::string &filename = *itor;
1467        if (endWithCompact(filename)) {
1468            continue;
1469        }
1470
1471        size_t secondDot = filename.rfind(".");
1472        char *ptr = NULL;
1473        uint64_t revnum = strtoull(filename.substr(secondDot + 1).c_str(), &ptr, 10);
1474        if (newrev < revnum) {
1475            newrev = revnum;
1476            dbFileName = filename;
1477        }
1478    }
1479    return newrev;
1480}
1481
1482void CouchKVStore::updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev) {
1483    if (vbucketId >= numDbFiles) {
1484        logger.log(EXTENSION_LOG_WARNING,
1485                   "CouchKVStore::updateDbFileMap: Cannot update db file map "
1486                   "for an invalid vbucket, vb:%" PRIu16", rev:%" PRIu64,
1487                   vbucketId, newFileRev);
1488        return;
1489    }
1490    // MB-27963: obtain write access whilst we update the file map openDB also
1491    // obtains this mutex to ensure the fileRev it obtains doesn't become stale
1492    // by the time it hits sys_open.
1493    std::lock_guard<cb::WriterLock> lg(openDbMutex);
1494
1495    (*dbFileRevMap)[vbucketId] = newFileRev;
1496}
1497
1498couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
1499                                        DbHolder& db,
1500                                        couchstore_open_flags options,
1501                                        FileOpsInterface* ops) {
1502    // MB-27963: obtain read access whilst we open the file, updateDbFileMap
1503    // serialises on this mutex so we can be sure the fileRev we read should
1504    // still be a valid file once we hit sys_open
1505    std::lock_guard<cb::ReaderLock> lg(openDbMutex);
1506    uint64_t fileRev = (*dbFileRevMap)[vbucketId];
1507    return openSpecificDB(vbucketId, fileRev, db, options, ops);
1508}
1509
1510couchstore_error_t CouchKVStore::openSpecificDB(uint16_t vbucketId,
1511                                                uint64_t fileRev,
1512                                                DbHolder& db,
1513                                                couchstore_open_flags options,
1514                                                FileOpsInterface* ops) {
1515    std::string dbFileName = getDBFileName(dbname, vbucketId, fileRev);
1516    db.setFileRev(fileRev); // save the rev so the caller can log it
1517
1518    if(ops == nullptr) {
1519        ops = statCollectingFileOps.get();
1520    }
1521
1522    couchstore_error_t errorCode = COUCHSTORE_SUCCESS;
1523
1524    /**
1525     * This flag disables IO buffering in couchstore which means
1526     * file operations will trigger syscalls immediately. This has
1527     * a detrimental impact on performance and is only intended
1528     * for testing.
1529     */
1530    if(!configuration.getBuffered()) {
1531        options |= COUCHSTORE_OPEN_FLAG_UNBUFFERED;
1532    }
1533
1534    errorCode = couchstore_open_db_ex(
1535            dbFileName.c_str(), options, ops, db.getDbAddress());
1536
1537    /* update command statistics */
1538    st.numOpen++;
1539    if (errorCode) {
1540        st.numOpenFailure++;
1541        logger.log(EXTENSION_LOG_WARNING,
1542                   "CouchKVStore::openDB: error:%s [%s],"
1543                   " name:%s, option:%" PRIX64 ", fileRev:%" PRIu64,
1544                   couchstore_strerror(errorCode),
1545                   cb_strerror().c_str(),
1546                   dbFileName.c_str(),
1547                   options,
1548                   fileRev);
1549
1550        if (errorCode == COUCHSTORE_ERROR_NO_SUCH_FILE) {
1551            auto dotPos = dbFileName.find_last_of(".");
1552            if (dotPos != std::string::npos) {
1553                dbFileName = dbFileName.substr(0, dotPos);
1554            }
1555            auto files = cb::io::findFilesWithPrefix(dbFileName);
1556            logger.log(
1557                    EXTENSION_LOG_WARNING,
1558                    "CouchKVStore::openDB: No such file, found:%zd alternative "
1559                    "files for %s",
1560                    files.size(),
1561                    dbFileName.c_str());
1562            for (const auto& f : files) {
1563                logger.log(EXTENSION_LOG_WARNING,
1564                           "CouchKVStore::openDB: Found %s",
1565                           f.c_str());
1566            }
1567        }
1568    }
1569
1570    return errorCode;
1571}
1572
1573void CouchKVStore::populateFileNameMap(std::vector<std::string> &filenames,
1574                                       std::vector<uint16_t> *vbids) {
1575    std::vector<std::string>::iterator fileItr;
1576
1577    for (fileItr = filenames.begin(); fileItr != filenames.end(); ++fileItr) {
1578        const std::string &filename = *fileItr;
1579        size_t secondDot = filename.rfind(".");
1580        std::string nameKey = filename.substr(0, secondDot);
1581        size_t firstDot = nameKey.rfind(".");
1582#ifdef _MSC_VER
1583        size_t firstSlash = nameKey.rfind("\\");
1584#else
1585        size_t firstSlash = nameKey.rfind("/");
1586#endif
1587
1588        std::string revNumStr = filename.substr(secondDot + 1);
1589        char *ptr = NULL;
1590        uint64_t revNum = strtoull(revNumStr.c_str(), &ptr, 10);
1591
1592        std::string vbIdStr = nameKey.substr(firstSlash + 1,
1593                                            (firstDot - firstSlash) - 1);
1594        if (allDigit(vbIdStr)) {
1595            int vbId = atoi(vbIdStr.c_str());
1596            if (vbids) {
1597                vbids->push_back(static_cast<uint16_t>(vbId));
1598            }
1599            uint64_t old_rev_num = (*dbFileRevMap)[vbId];
1600            if (old_rev_num == revNum) {
1601                continue;
1602            } else if (old_rev_num < revNum) { // stale revision found
1603                (*dbFileRevMap)[vbId] = revNum;
1604            } else { // stale file found (revision id has rolled over)
1605                old_rev_num = revNum;
1606            }
1607            std::stringstream old_file;
1608            old_file << dbname << "/" << vbId << ".couch." << old_rev_num;
1609            if (access(old_file.str().c_str(), F_OK) == 0) {
1610                if (!isReadOnly()) {
1611                    if (remove(old_file.str().c_str()) == 0) {
1612                        logger.log(EXTENSION_LOG_INFO,
1613                                  "CouchKVStore::populateFileNameMap: Removed "
1614                                  "stale file:%s", old_file.str().c_str());
1615                    } else {
1616                        logger.log(EXTENSION_LOG_WARNING,
1617                                   "CouchKVStore::populateFileNameMap: remove "
1618                                   "error:%s, file:%s", cb_strerror().c_str(),
1619                                   old_file.str().c_str());
1620                    }
1621                } else {
1622                    logger.log(EXTENSION_LOG_WARNING,
1623                               "CouchKVStore::populateFileNameMap: A read-only "
1624                               "instance of the underlying store "
1625                               "was not allowed to delete a stale file:%s",
1626                               old_file.str().c_str());
1627                }
1628            }
1629        } else {
1630            // skip non-vbucket database file, master.couch etc
1631            logger.log(EXTENSION_LOG_DEBUG,
1632                       "CouchKVStore::populateFileNameMap: Non-vbucket database file, %s, skip adding "
1633                       "to CouchKVStore dbFileMap", filename.c_str());
1634        }
1635    }
1636}
1637
1638couchstore_error_t CouchKVStore::fetchDoc(Db* db,
1639                                          DocInfo* docinfo,
1640                                          GetValue& docValue,
1641                                          uint16_t vbId,
1642                                          GetMetaOnly metaOnly) {
1643    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1644    std::unique_ptr<MetaData> metadata;
1645    try {
1646        metadata = MetaDataFactory::createMetaData(docinfo->rev_meta);
1647    } catch (std::logic_error&) {
1648        return COUCHSTORE_ERROR_DB_NO_LONGER_VALID;
1649    }
1650
1651    if (metaOnly == GetMetaOnly::Yes) {
1652        // Collections: TODO: Permanently restore to stored namespace
1653        auto it = std::make_unique<Item>(
1654                makeDocKey(docinfo->id,
1655                           configuration.shouldPersistDocNamespace()),
1656                metadata->getFlags(),
1657                metadata->getExptime(),
1658                nullptr,
1659                docinfo->size,
1660                metadata->getDataType(),
1661                metadata->getCas(),
1662                docinfo->db_seq,
1663                vbId);
1664
1665        it->setRevSeqno(docinfo->rev_seq);
1666
1667        if (docinfo->deleted) {
1668            it->setDeleted();
1669        }
1670        docValue = GetValue(std::move(it));
1671        // update ep-engine IO stats
1672        ++st.io_bg_fetch_docs_read;
1673        st.io_bgfetch_doc_bytes += (docinfo->id.size + docinfo->rev_meta.size);
1674    } else {
1675        Doc *doc = nullptr;
1676        size_t valuelen = 0;
1677        void* valuePtr = nullptr;
1678        protocol_binary_datatype_t datatype = PROTOCOL_BINARY_RAW_BYTES;
1679        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1680                                                   DECOMPRESS_DOC_BODIES);
1681        if (errCode == COUCHSTORE_SUCCESS) {
1682            if (doc == nullptr) {
1683                throw std::logic_error("CouchKVStore::fetchDoc: doc is NULL");
1684            }
1685
1686            if (doc->id.size > UINT16_MAX) {
1687                throw std::logic_error("CouchKVStore::fetchDoc: "
1688                            "doc->id.size (which is" +
1689                            std::to_string(doc->id.size) + ") is greater than "
1690                            + std::to_string(UINT16_MAX));
1691            }
1692
1693            valuelen = doc->data.size;
1694            valuePtr = doc->data.buf;
1695
1696            if (metadata->getVersionInitialisedFrom() == MetaData::Version::V0) {
1697                // This is a super old version of a couchstore file.
1698                // Try to determine if the document is JSON or raw bytes
1699                datatype = determine_datatype(doc->data);
1700            } else {
1701                datatype = metadata->getDataType();
1702            }
1703        } else if (errCode == COUCHSTORE_ERROR_DOC_NOT_FOUND && docinfo->deleted) {
1704            datatype = metadata->getDataType();
1705        } else {
1706            return errCode;
1707        }
1708
1709        try {
1710            // Collections: TODO: Restore to stored namespace
1711            auto it = std::make_unique<Item>(
1712                    makeDocKey(docinfo->id,
1713                               configuration.shouldPersistDocNamespace()),
1714                    metadata->getFlags(),
1715                    metadata->getExptime(),
1716                    valuePtr,
1717                    valuelen,
1718                    datatype,
1719                    metadata->getCas(),
1720                    docinfo->db_seq,
1721                    vbId,
1722                    docinfo->rev_seq);
1723
1724             if (docinfo->deleted) {
1725                 it->setDeleted();
1726             }
1727             docValue = GetValue(std::move(it));
1728        } catch (std::bad_alloc&) {
1729            couchstore_free_document(doc);
1730            return COUCHSTORE_ERROR_ALLOC_FAIL;
1731        }
1732
1733        // update ep-engine IO stats
1734        ++st.io_bg_fetch_docs_read;
1735        st.io_bgfetch_doc_bytes +=
1736                (docinfo->id.size + docinfo->rev_meta.size + valuelen);
1737
1738        couchstore_free_document(doc);
1739    }
1740    return COUCHSTORE_SUCCESS;
1741}
1742
1743int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
1744
1745    ScanContext* sctx = static_cast<ScanContext*>(ctx);
1746    auto* cb = sctx->callback.get();
1747    auto* cl = sctx->lookup.get();
1748
1749    Doc *doc = nullptr;
1750    sized_buf value{nullptr, 0};
1751    uint64_t byseqno = docinfo->db_seq;
1752    uint16_t vbucketId = sctx->vbid;
1753
1754    sized_buf key = docinfo->id;
1755    if (key.size > UINT16_MAX) {
1756        throw std::invalid_argument("CouchKVStore::recordDbDump: "
1757                        "docinfo->id.size (which is " + std::to_string(key.size) +
1758                        ") is greater than " + std::to_string(UINT16_MAX));
1759    }
1760
1761    // Collections: TODO: Permanently restore to stored namespace
1762    DocKey docKey = makeDocKey(
1763            docinfo->id, sctx->config.shouldPersistDocNamespace());
1764
1765    if (sctx->collectionsContext.manageSeparator(docKey)) {
1766        sctx->lastReadSeqno = byseqno;
1767        return COUCHSTORE_SUCCESS;
1768    }
1769
1770    CacheLookup lookup(docKey,
1771                       byseqno,
1772                       vbucketId,
1773                       sctx->collectionsContext.getSeparator());
1774    cl->callback(lookup);
1775    if (cl->getStatus() == ENGINE_KEY_EEXISTS) {
1776        sctx->lastReadSeqno = byseqno;
1777        return COUCHSTORE_SUCCESS;
1778    } else if (cl->getStatus() == ENGINE_ENOMEM) {
1779        return COUCHSTORE_ERROR_CANCEL;
1780    }
1781
1782    auto metadata = MetaDataFactory::createMetaData(docinfo->rev_meta);
1783
1784    if (sctx->valFilter != ValueFilter::KEYS_ONLY) {
1785        couchstore_open_options openOptions = 0;
1786
1787        /**
1788         * If the stored document has V0 metdata (no datatype)
1789         * or no special request is made to retrieve compressed documents
1790         * as is, then DECOMPRESS the document and update datatype
1791         */
1792        if (docinfo->rev_meta.size == metadata->getMetaDataSize(MetaData::Version::V0) ||
1793            sctx->valFilter == ValueFilter::VALUES_DECOMPRESSED) {
1794            openOptions = DECOMPRESS_DOC_BODIES;
1795        }
1796
1797        auto errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1798                                                        openOptions);
1799
1800        if (errCode == COUCHSTORE_SUCCESS) {
1801            value = doc->data;
1802            if (doc->data.size) {
1803                if ((openOptions & DECOMPRESS_DOC_BODIES) == 0) {
1804                    // We always store the document bodies compressed on disk,
1805                    // but now the client _wanted_ to fetch the document
1806                    // in a compressed mode.
1807                    // We've never stored the "compressed" flag on disk
1808                    // (as we don't keep items compressed in memory).
1809                    // Update the datatype flag for this item to
1810                    // reflect that it is compressed so that the
1811                    // receiver of the object may notice (Note:
1812                    // this is currently _ONLY_ happening via DCP
1813                     auto datatype = metadata->getDataType();
1814                     metadata->setDataType(datatype | PROTOCOL_BINARY_DATATYPE_SNAPPY);
1815                } else if (metadata->getVersionInitialisedFrom() == MetaData::Version::V0) {
1816                    // This is a super old version of a couchstore file.
1817                    // Try to determine if the document is JSON or raw bytes
1818                    metadata->setDataType(determine_datatype(doc->data));
1819                }
1820            } else {
1821                // No data, it cannot have a datatype!
1822                metadata->setDataType(PROTOCOL_BINARY_RAW_BYTES);
1823            }
1824        } else if (errCode != COUCHSTORE_ERROR_DOC_NOT_FOUND) {
1825            sctx->logger->log(EXTENSION_LOG_WARNING,
1826                              "CouchKVStore::recordDbDump: "
1827                              "couchstore_open_doc_with_docinfo error:%s [%s], "
1828                              "vb:%" PRIu16 ", seqno:%" PRIu64,
1829                              couchstore_strerror(errCode),
1830                              couchkvstore_strerrno(db, errCode).c_str(),
1831                              vbucketId, docinfo->rev_seq);
1832            return COUCHSTORE_SUCCESS;
1833        }
1834    }
1835
1836    // Collections: TODO: Permanently restore to stored namespace
1837    auto it = std::make_unique<Item>(
1838            DocKey(makeDocKey(key, sctx->config.shouldPersistDocNamespace())),
1839            metadata->getFlags(),
1840            metadata->getExptime(),
1841            value.buf,
1842            value.size,
1843            metadata->getDataType(),
1844            metadata->getCas(),
1845            docinfo->db_seq, // return seq number being persisted on disk
1846            vbucketId,
1847            docinfo->rev_seq);
1848
1849    if (docinfo->deleted) {
1850        it->setDeleted();
1851    }
1852
1853    bool onlyKeys = (sctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false;
1854    GetValue rv(std::move(it), ENGINE_SUCCESS, -1, onlyKeys);
1855    cb->callback(rv);
1856
1857    couchstore_free_document(doc);
1858
1859    if (cb->getStatus() == ENGINE_ENOMEM) {
1860        return COUCHSTORE_ERROR_CANCEL;
1861    }
1862
1863    sctx->lastReadSeqno = byseqno;
1864    return COUCHSTORE_SUCCESS;
1865}
1866
1867bool CouchKVStore::commit2couchstore(const Item* collectionsManifest) {
1868    bool success = true;
1869
1870    size_t pendingCommitCnt = pendingReqsQ.size();
1871    if (pendingCommitCnt == 0 && !collectionsManifest) {
1872        return success;
1873    }
1874
1875    // Use the vbucket of the first item or the manifest item
1876    uint16_t vbucket2flush = pendingCommitCnt
1877                                     ? pendingReqsQ[0]->getVBucketId()
1878                                     : collectionsManifest->getVBucketId();
1879
1880    TRACE_EVENT2("CouchKVStore",
1881                 "commit2couchstore",
1882                 "vbid",
1883                 vbucket2flush,
1884                 "pendingCommitCnt",
1885                 pendingCommitCnt);
1886
1887    // When an item and a manifest are present, vbucket2flush is read from the
1888    // item. Check it matches the manifest
1889    if (pendingCommitCnt && collectionsManifest &&
1890        vbucket2flush != collectionsManifest->getVBucketId()) {
1891        throw std::logic_error(
1892                "CouchKVStore::commit2couchstore: manifest/item vbucket "
1893                "mismatch vbucket2flush:" +
1894                std::to_string(vbucket2flush) + " manifest vb:" +
1895                std::to_string(collectionsManifest->getVBucketId()));
1896    }
1897
1898    std::vector<Doc*> docs(pendingCommitCnt);
1899    std::vector<DocInfo*> docinfos(pendingCommitCnt);
1900
1901    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1902        CouchRequest *req = pendingReqsQ[i];
1903        docs[i] = (Doc *)req->getDbDoc();
1904        docinfos[i] = req->getDbDocInfo();
1905        if (vbucket2flush != req->getVBucketId()) {
1906            throw std::logic_error(
1907                    "CouchKVStore::commit2couchstore: "
1908                    "mismatch between vbucket2flush (which is "
1909                    + std::to_string(vbucket2flush) + ") and pendingReqsQ["
1910                    + std::to_string(i) + "] (which is "
1911                    + std::to_string(req->getVBucketId()) + ")");
1912        }
1913    }
1914
1915    // The docinfo callback needs to know if the DocNamespace feature is on
1916    kvstats_ctx kvctx(configuration.shouldPersistDocNamespace());
1917    // flush all
1918    couchstore_error_t errCode =
1919            saveDocs(vbucket2flush, docs, docinfos, kvctx, collectionsManifest);
1920
1921    if (errCode) {
1922        success = false;
1923        logger.log(EXTENSION_LOG_WARNING,
1924                   "CouchKVStore::commit2couchstore: saveDocs error:%s, "
1925                   "vb:%" PRIu16,
1926                   couchstore_strerror(errCode),
1927                   vbucket2flush);
1928    }
1929
1930    commitCallback(pendingReqsQ, kvctx, errCode);
1931
1932    // clean up
1933    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1934        delete pendingReqsQ[i];
1935    }
1936    pendingReqsQ.clear();
1937    return success;
1938}
1939
1940static int readDocInfos(Db *db, DocInfo *docinfo, void *ctx) {
1941    if (ctx == nullptr) {
1942        throw std::invalid_argument("readDocInfos: ctx must be non-NULL");
1943    }
1944    if (!docinfo) {
1945        throw std::invalid_argument("readDocInfos: docInfo must be non-NULL");
1946    }
1947    kvstats_ctx* cbCtx = static_cast<kvstats_ctx*>(ctx);
1948    if(docinfo) {
1949        // An item exists in the VB DB file.
1950        if (!docinfo->deleted) {
1951            // Collections: TODO: Permanently restore to stored namespace
1952            auto itr = cbCtx->keyStats.find(
1953                    makeDocKey(docinfo->id, cbCtx->persistDocNamespace));
1954            if (itr != cbCtx->keyStats.end()) {
1955                itr->second = true;
1956            }
1957        }
1958    }
1959    return 0;
1960}
1961
1962couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid,
1963                                          const std::vector<Doc*>& docs,
1964                                          std::vector<DocInfo*>& docinfos,
1965                                          kvstats_ctx& kvctx,
1966                                          const Item* collectionsManifest) {
1967    couchstore_error_t errCode;
1968    DbInfo info;
1969    DbHolder db(*this);
1970    errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_CREATE);
1971    if (errCode != COUCHSTORE_SUCCESS) {
1972        logger.log(EXTENSION_LOG_WARNING,
1973                   "CouchKVStore::saveDocs: openDB error:%s, vb:%" PRIu16
1974                   ", rev:%" PRIu64 ", numdocs:%" PRIu64,
1975                   couchstore_strerror(errCode),
1976                   vbid,
1977                   db.getFileRev(),
1978                   uint64_t(docs.size()));
1979        return errCode;
1980    } else {
1981        vbucket_state* state = getVBucketState(vbid);
1982        if (state == nullptr) {
1983            throw std::logic_error(
1984                    "CouchKVStore::saveDocs: cachedVBStates[" +
1985                    std::to_string(vbid) + "] is NULL");
1986        }
1987
1988        uint64_t maxDBSeqno = 0;
1989
1990        // Only do a couchstore_save_documents if there are docs
1991        if (docs.size() > 0) {
1992            std::vector<sized_buf> ids(docs.size());
1993            for (size_t idx = 0; idx < docs.size(); idx++) {
1994                ids[idx] = docinfos[idx]->id;
1995                maxDBSeqno = std::max(maxDBSeqno, docinfos[idx]->db_seq);
1996                DocKey key = makeDocKey(
1997                        ids[idx], configuration.shouldPersistDocNamespace());
1998                kvctx.keyStats[key] = false;
1999            }
2000            errCode = couchstore_docinfos_by_id(db,
2001                                                ids.data(),
2002                                                (unsigned)ids.size(),
2003                                                0,
2004                                                readDocInfos,
2005                                                &kvctx);
2006            if (errCode != COUCHSTORE_SUCCESS) {
2007                logger.log(EXTENSION_LOG_WARNING,
2008                           "CouchKVStore::saveDocs: couchstore_docinfos_by_id "
2009                           "error:%s [%s], vb:%" PRIu16 ", numdocs:%" PRIu64,
2010                           couchstore_strerror(errCode),
2011                           couchkvstore_strerrno(db, errCode).c_str(),
2012                           vbid,
2013                           uint64_t(docs.size()));
2014                return errCode;
2015            }
2016
2017            auto cs_begin = ProcessClock::now();
2018            uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
2019            errCode = couchstore_save_documents(db,
2020                                                docs.data(),
2021                                                docinfos.data(),
2022                                                (unsigned)docs.size(),
2023                                                flags);
2024            st.saveDocsHisto.add(
2025                    std::chrono::duration_cast<std::chrono::microseconds>(
2026                            ProcessClock::now() - cs_begin));
2027            if (errCode != COUCHSTORE_SUCCESS) {
2028                logger.log(EXTENSION_LOG_WARNING,
2029                           "CouchKVStore::saveDocs: couchstore_save_documents "
2030                           "error:%s [%s], vb:%" PRIu16 ", numdocs:%" PRIu64,
2031                           couchstore_strerror(errCode),
2032                           couchkvstore_strerrno(db, errCode).c_str(),
2033                           vbid,
2034                           uint64_t(docs.size()));
2035                return errCode;
2036            }
2037        }
2038
2039        errCode = saveVBState(db, *state);
2040        if (errCode != COUCHSTORE_SUCCESS) {
2041            logger.log(EXTENSION_LOG_WARNING,
2042                       "CouchKVStore::saveDocs: saveVBState error:%s [%s]",
2043                       couchstore_strerror(errCode),
2044                       couchkvstore_strerrno(db, errCode).c_str());
2045            return errCode;
2046        }
2047
2048        if (collectionsManifest) {
2049            saveCollectionsManifest(*db, *collectionsManifest);
2050        }
2051
2052        auto cs_begin = ProcessClock::now();
2053        errCode = couchstore_commit(db);
2054        st.commitHisto.add(
2055                std::chrono::duration_cast<std::chrono::microseconds>(
2056                        ProcessClock::now() - cs_begin));
2057        if (errCode) {
2058            logger.log(
2059                    EXTENSION_LOG_WARNING,
2060                    "CouchKVStore::saveDocs: couchstore_commit error:%s [%s]",
2061                    couchstore_strerror(errCode),
2062                    couchkvstore_strerrno(db, errCode).c_str());
2063            return errCode;
2064        }
2065
2066        st.batchSize.add(docs.size());
2067
2068        // retrieve storage system stats for file fragmentation computation
2069        errCode = couchstore_db_info(db, &info);
2070        if (errCode) {
2071            logger.log(
2072                    EXTENSION_LOG_WARNING,
2073                    "CouchKVStore::saveDocs: couchstore_db_info error:%s [%s]",
2074                    couchstore_strerror(errCode),
2075                    couchkvstore_strerrno(db, errCode).c_str());
2076            return errCode;
2077        }
2078        cachedSpaceUsed[vbid] = info.space_used;
2079        cachedFileSize[vbid] = info.file_size;
2080        cachedDeleteCount[vbid] = info.deleted_count;
2081        cachedDocCount[vbid] = info.doc_count;
2082
2083        // Check seqno if we wrote documents
2084        if (docs.size() > 0 && maxDBSeqno != info.last_sequence) {
2085            logger.log(EXTENSION_LOG_WARNING,
2086                       "CouchKVStore::saveDocs: Seqno in db header (%" PRIu64 ")"
2087                       " is not matched with what was persisted (%" PRIu64 ")"
2088                       " for vb:%" PRIu16,
2089                       info.last_sequence, maxDBSeqno, vbid);
2090        }
2091        state->highSeqno = info.last_sequence;
2092    }
2093
2094    /* update stat */
2095    if(errCode == COUCHSTORE_SUCCESS) {
2096        st.docsCommitted = docs.size();
2097    }
2098
2099    return errCode;
2100}
2101
2102void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
2103                                  kvstats_ctx &kvctx,
2104                                  couchstore_error_t errCode) {
2105    size_t commitSize = committedReqs.size();
2106
2107    for (size_t index = 0; index < commitSize; index++) {
2108        size_t dataSize = committedReqs[index]->getNBytes();
2109        size_t keySize = committedReqs[index]->getKey().size();
2110        /* update ep stats */
2111        ++st.io_num_write;
2112        st.io_write_bytes += (keySize + dataSize);
2113
2114        if (committedReqs[index]->isDelete()) {
2115            int rv = getMutationStatus(errCode);
2116            if (rv != -1) {
2117                const auto& key = committedReqs[index]->getKey();
2118                if (kvctx.keyStats[key]) {
2119                    rv = 1; // Deletion is for an existing item on DB file.
2120                } else {
2121                    rv = 0; // Deletion is for a non-existing item on DB file.
2122                }
2123            }
2124            if (errCode) {
2125                ++st.numDelFailure;
2126            } else {
2127                st.delTimeHisto.add(committedReqs[index]->getDelta());
2128            }
2129            committedReqs[index]->getDelCallback()->callback(*transactionCtx,
2130                                                             rv);
2131        } else {
2132            int rv = getMutationStatus(errCode);
2133            const auto& key = committedReqs[index]->getKey();
2134            bool insertion = !kvctx.keyStats[key];
2135            if (errCode) {
2136                ++st.numSetFailure;
2137            } else {
2138                st.writeTimeHisto.add(committedReqs[index]->getDelta());
2139                st.writeSizeHisto.add(dataSize + keySize);
2140            }
2141            mutation_result p(rv, insertion);
2142            committedReqs[index]->getSetCallback()->callback(*transactionCtx,
2143                                                             p);
2144        }
2145    }
2146}
2147
2148ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
2149    sized_buf id;
2150    LocalDoc *ldoc = NULL;
2151    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
2152    vbucket_state_t state = vbucket_state_dead;
2153    uint64_t checkpointId = 0;
2154    uint64_t maxDeletedSeqno = 0;
2155    int64_t highSeqno = 0;
2156    std::string failovers;
2157    uint64_t purgeSeqno = 0;
2158    uint64_t lastSnapStart = 0;
2159    uint64_t lastSnapEnd = 0;
2160    uint64_t maxCas = 0;
2161    int64_t hlcCasEpochSeqno = HlcCasSeqnoUninitialised;
2162    bool mightContainXattrs = false;
2163
2164    DbInfo info;
2165    errCode = couchstore_db_info(db, &info);
2166    if (errCode == COUCHSTORE_SUCCESS) {
2167        highSeqno = info.last_sequence;
2168        purgeSeqno = info.purge_seq;
2169    } else {
2170        logger.log(EXTENSION_LOG_WARNING,
2171                   "CouchKVStore::readVBState: couchstore_db_info error:%s"
2172                   ", vb:%" PRIu16, couchstore_strerror(errCode), vbId);
2173        return couchErr2EngineErr(errCode);
2174    }
2175
2176    id.buf = (char *)"_local/vbstate";
2177    id.size = sizeof("_local/vbstate") - 1;
2178    errCode = couchstore_open_local_document(db, (void *)id.buf,
2179                                             id.size, &ldoc);
2180    if (errCode != COUCHSTORE_SUCCESS) {
2181        if (errCode == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
2182            logger.log(EXTENSION_LOG_NOTICE,
2183                       "CouchKVStore::readVBState: '_local/vbstate' not found "
2184                       "for vb:%d", vbId);
2185        } else {
2186            logger.log(EXTENSION_LOG_WARNING,
2187                       "CouchKVStore::readVBState: couchstore_open_local_document"
2188                       " error:%s, vb:%" PRIu16, couchstore_strerror(errCode),
2189                       vbId);
2190        }
2191    } else {
2192        const std::string statjson(ldoc->json.buf, ldoc->json.size);
2193        cJSON *jsonObj = cJSON_Parse(statjson.c_str());
2194        if (!jsonObj) {
2195            couchstore_free_local_document(ldoc);
2196            logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::readVBState: Failed to "
2197                       "parse the vbstat json doc for vb:%" PRIu16 ", json:%s",
2198                       vbId , statjson.c_str());
2199            return couchErr2EngineErr(errCode);
2200        }
2201
2202        const std::string vb_state = getJSONObjString(
2203                                cJSON_GetObjectItem(jsonObj, "state"));
2204        const std::string checkpoint_id = getJSONObjString(
2205                                cJSON_GetObjectItem(jsonObj,"checkpoint_id"));
2206        const std::string max_deleted_seqno = getJSONObjString(
2207                                cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
2208        const std::string snapStart = getJSONObjString(
2209                                cJSON_GetObjectItem(jsonObj, "snap_start"));
2210        const std::string snapEnd = getJSONObjString(
2211                                cJSON_GetObjectItem(jsonObj, "snap_end"));
2212        const std::string maxCasValue = getJSONObjString(
2213                                cJSON_GetObjectItem(jsonObj, "max_cas"));
2214        const std::string hlcCasEpoch =
2215                getJSONObjString(cJSON_GetObjectItem(jsonObj, "hlc_epoch"));
2216        mightContainXattrs = getJSONObjBool(
2217                cJSON_GetObjectItem(jsonObj, "might_contain_xattrs"));
2218
2219        cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
2220        if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
2221                || max_deleted_seqno.compare("") == 0) {
2222            logger.log(EXTENSION_LOG_WARNING, "CouchKVStore::readVBState: State"
2223                       " JSON doc for vb:%" PRIu16 " is in the wrong format:%s, "
2224                       "vb state:%s, checkpoint id:%s and max deleted seqno:%s",
2225                       vbId, statjson.c_str(), vb_state.c_str(),
2226                       checkpoint_id.c_str(), max_deleted_seqno.c_str());
2227        } else {
2228            state = VBucket::fromString(vb_state.c_str());
2229            parseUint64(max_deleted_seqno.c_str(), &maxDeletedSeqno);
2230            parseUint64(checkpoint_id.c_str(), &checkpointId);
2231
2232            if (snapStart.compare("") == 0) {
2233                lastSnapStart = highSeqno;
2234            } else {
2235                parseUint64(snapStart.c_str(), &lastSnapStart);
2236            }
2237
2238            if (snapEnd.compare("") == 0) {
2239                lastSnapEnd = highSeqno;
2240            } else {
2241                parseUint64(snapEnd.c_str(), &lastSnapEnd);
2242            }
2243
2244            if (maxCasValue.compare("") != 0) {
2245                parseUint64(maxCasValue.c_str(), &maxCas);
2246
2247                // MB-17517: If the maxCas on disk was invalid then don't use it -
2248                // instead rebuild from the items we load from disk (i.e. as per
2249                // an upgrade from an earlier version).
2250                if (maxCas == static_cast<uint64_t>(-1)) {
2251                    logger.log(EXTENSION_LOG_WARNING,
2252                               "CouchKVStore::readVBState: Invalid max_cas "
2253                               "(0x%" PRIx64 ") read from '%s' for vb:%" PRIu16
2254                               ". Resetting max_cas to zero.",
2255                               maxCas, id.buf, vbId);
2256                    maxCas = 0;
2257                }
2258            }
2259
2260            if (!hlcCasEpoch.empty()) {
2261                parseInt64(hlcCasEpoch.c_str(), &hlcCasEpochSeqno);
2262            }
2263
2264            if (failover_json) {
2265                failovers = to_string(failover_json, false);
2266            }
2267        }
2268        cJSON_Delete(jsonObj);
2269        couchstore_free_local_document(ldoc);
2270    }
2271
2272    cachedVBStates[vbId] = std::make_unique<vbucket_state>(state,
2273                                                           checkpointId,
2274                                                           maxDeletedSeqno,
2275                                                           highSeqno,
2276                                                           purgeSeqno,
2277                                                           lastSnapStart,
2278                                                           lastSnapEnd,
2279                                                           maxCas,
2280                                                           hlcCasEpochSeqno,
2281                                                           mightContainXattrs,
2282                                                           failovers);
2283
2284    return couchErr2EngineErr(errCode);
2285}
2286
2287couchstore_error_t CouchKVStore::saveVBState(Db *db,
2288                                             const vbucket_state &vbState) {
2289    std::stringstream jsonState;
2290
2291    jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
2292              << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
2293              << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno << "\"";
2294    if (!vbState.failovers.empty()) {
2295        jsonState << ",\"failover_table\": " << vbState.failovers;
2296    }
2297    jsonState << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
2298              << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
2299              << ",\"max_cas\": \"" << vbState.maxCas << "\""
2300              << ",\"hlc_epoch\": \"" << vbState.hlcCasEpochSeqno << "\"";
2301
2302    if (vbState.mightContainXattrs) {
2303        jsonState << ",\"might_contain_xattrs\": true";
2304    } else {
2305        jsonState << ",\"might_contain_xattrs\": false";
2306    }
2307
2308    jsonState << "}";
2309
2310    LocalDoc lDoc;
2311    lDoc.id.buf = (char *)"_local/vbstate";
2312    lDoc.id.size = sizeof("_local/vbstate") - 1;
2313    std::string state = jsonState.str();
2314    lDoc.json.buf = (char *)state.c_str();
2315    lDoc.json.size = state.size();
2316    lDoc.deleted = 0;
2317
2318    couchstore_error_t errCode = couchstore_save_local_document(db, &lDoc);
2319    if (errCode != COUCHSTORE_SUCCESS) {
2320        logger.log(EXTENSION_LOG_WARNING,
2321                   "CouchKVStore::saveVBState couchstore_save_local_document "
2322                   "error:%s [%s]", couchstore_strerror(errCode),
2323                   couchkvstore_strerrno(db, errCode).c_str());
2324    }
2325
2326    return errCode;
2327}
2328
2329couchstore_error_t CouchKVStore::saveCollectionsManifest(
2330        Db& db, const Item& collectionsManifest) {
2331    LocalDoc lDoc;
2332    lDoc.id.buf = const_cast<char*>(Collections::CouchstoreManifest);
2333    lDoc.id.size = Collections::CouchstoreManifestLen;
2334
2335    // Convert the Item into JSON
2336    std::string state =
2337            Collections::VB::Manifest::serialToJson(collectionsManifest);
2338
2339    lDoc.json.buf = const_cast<char*>(state.c_str());
2340    lDoc.json.size = state.size();
2341    lDoc.deleted = 0;
2342
2343    couchstore_error_t errCode = couchstore_save_local_document(&db, &lDoc);
2344
2345    if (errCode != COUCHSTORE_SUCCESS) {
2346        logger.log(EXTENSION_LOG_WARNING,
2347                   "CouchKVStore::saveCollectionsManifest "
2348                   "couchstore_save_local_document "
2349                   "error:%s [%s]",
2350                   couchstore_strerror(errCode),
2351                   couchkvstore_strerrno(&db, errCode).c_str());
2352    }
2353
2354    return errCode;
2355}
2356
2357std::string CouchKVStore::readCollectionsManifest(Db& db) {
2358    sized_buf id;
2359    id.buf = const_cast<char*>(Collections::CouchstoreManifest);
2360    id.size = sizeof(Collections::CouchstoreManifest) - 1;
2361
2362    LocalDocHolder lDoc;
2363    auto errCode = couchstore_open_local_document(
2364            &db, (void*)id.buf, id.size, lDoc.getLocalDocAddress());
2365    if (errCode != COUCHSTORE_SUCCESS) {
2366        if (errCode == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
2367            logger.log(EXTENSION_LOG_NOTICE,
2368                   "CouchKVStore::readCollectionsManifest: doc not found");
2369        } else {
2370            logger.log(EXTENSION_LOG_WARNING,
2371                   "CouchKVStore::readCollectionsManifest: "
2372                   "couchstore_open_local_document error:%s",
2373                   couchstore_strerror(errCode));
2374        }
2375
2376        return {};
2377    }
2378
2379    return {lDoc.getLocalDoc()->json.buf, lDoc.getLocalDoc()->json.size};
2380}
2381
2382int CouchKVStore::getMultiCb(Db *db, DocInfo *docinfo, void *ctx) {
2383    if (docinfo == nullptr) {
2384        throw std::invalid_argument("CouchKVStore::getMultiCb: docinfo "
2385                "must be non-NULL");
2386    }
2387    if (ctx == nullptr) {
2388        throw std::invalid_argument("CouchKVStore::getMultiCb: ctx must "
2389                "be non-NULL");
2390    }
2391
2392    GetMultiCbCtx *cbCtx = static_cast<GetMultiCbCtx *>(ctx);
2393    // Collections: TODO: Permanently restore to stored namespace
2394    DocKey key = makeDocKey(docinfo->id,
2395                            cbCtx->cks.getConfig().shouldPersistDocNamespace());
2396    KVStoreStats& st = cbCtx->cks.getKVStoreStat();
2397
2398    vb_bgfetch_queue_t::iterator qitr = cbCtx->fetches.find(key);
2399    if (qitr == cbCtx->fetches.end()) {
2400        // this could be a serious race condition in couchstore,
2401        // log a warning message and continue
2402        cbCtx->cks.logger.log(EXTENSION_LOG_WARNING,
2403                              "CouchKVStore::getMultiCb: Couchstore returned "
2404                              "invalid docinfo, no pending bgfetch has been "
2405                              "issued for a key in vb:%" PRIu16 ", "
2406                              "seqno:%" PRIu64, cbCtx->vbId, docinfo->rev_seq);
2407        return 0;
2408    }
2409
2410    vb_bgfetch_item_ctx_t& bg_itm_ctx = (*qitr).second;
2411    GetMetaOnly meta_only = bg_itm_ctx.isMetaOnly;
2412
2413    couchstore_error_t errCode = cbCtx->cks.fetchDoc(
2414            db, docinfo, bg_itm_ctx.value, cbCtx->vbId, meta_only);
2415    if (errCode != COUCHSTORE_SUCCESS && (meta_only == GetMetaOnly::No)) {
2416        st.numGetFailure++;
2417    }
2418
2419    bg_itm_ctx.value.setStatus(cbCtx->cks.couchErr2EngineErr(errCode));
2420
2421    bool return_val_ownership_transferred = false;
2422    for (auto& fetch : bg_itm_ctx.bgfetched_list) {
2423        return_val_ownership_transferred = true;
2424        // populate return value for remaining fetch items with the
2425        // same seqid
2426        fetch->value = &bg_itm_ctx.value;
2427        st.readTimeHisto.add(
2428                std::chrono::duration_cast<std::chrono::microseconds>(
2429                        ProcessClock::now() - fetch->initTime));
2430        if (errCode == COUCHSTORE_SUCCESS) {
2431            st.readSizeHisto.add(bg_itm_ctx.value.item->getKey().size() +
2432                                 bg_itm_ctx.value.item->getNBytes());
2433        }
2434    }
2435    if (!return_val_ownership_transferred) {
2436        cbCtx->cks.logger.log(EXTENSION_LOG_WARNING,
2437                              "CouchKVStore::getMultiCb called with zero"
2438                              "items in bgfetched_list, vb:%" PRIu16
2439                              ", seqno:%" PRIu64,
2440                              cbCtx->vbId, docinfo->rev_seq);
2441    }
2442
2443    return 0;
2444}
2445
2446
2447void CouchKVStore::closeDatabaseHandle(Db *db) {
2448    couchstore_error_t ret = couchstore_close_file(db);
2449    if (ret != COUCHSTORE_SUCCESS) {
2450        logger.log(EXTENSION_LOG_WARNING,
2451                   "CouchKVStore::closeDatabaseHandle: couchstore_close_file "
2452                   "error:%s [%s]", couchstore_strerror(ret),
2453                   couchkvstore_strerrno(db, ret).c_str());
2454    }
2455    ret = couchstore_free_db(db);
2456    if (ret != COUCHSTORE_SUCCESS) {
2457        logger.log(EXTENSION_LOG_WARNING,
2458                   "CouchKVStore::closeDatabaseHandle: couchstore_free_db "
2459                   "error:%s [%s]", couchstore_strerror(ret),
2460                   couchkvstore_strerrno(nullptr, ret).c_str());
2461    }
2462    st.numClose++;
2463}
2464
2465ENGINE_ERROR_CODE CouchKVStore::couchErr2EngineErr(couchstore_error_t errCode) {
2466    switch (errCode) {
2467    case COUCHSTORE_SUCCESS:
2468        return ENGINE_SUCCESS;
2469    case COUCHSTORE_ERROR_ALLOC_FAIL:
2470        return ENGINE_ENOMEM;
2471    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
2472        return ENGINE_KEY_ENOENT;
2473    case COUCHSTORE_ERROR_NO_SUCH_FILE:
2474    case COUCHSTORE_ERROR_NO_HEADER:
2475    default:
2476        // same as the general error return code of
2477        // EPBucket::getInternal
2478        return ENGINE_TMPFAIL;
2479    }
2480}
2481
2482size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
2483    size_t delCount = cachedDeleteCount[vbid];
2484    if (delCount != (size_t) -1) {
2485        return delCount;
2486    }
2487
2488    DbHolder db(*this);
2489    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
2490    if (errCode == COUCHSTORE_SUCCESS) {
2491        DbInfo info;
2492        errCode = couchstore_db_info(db, &info);
2493        if (errCode == COUCHSTORE_SUCCESS) {
2494            cachedDeleteCount[vbid] = info.deleted_count;
2495            return info.deleted_count;
2496        } else {
2497            throw std::runtime_error(
2498                    "CouchKVStore::getNumPersistedDeletes:"
2499                    "Failed to read database info for vBucket = " +
2500                    std::to_string(vbid) + " rev = " +
2501                    std::to_string(db.getFileRev()) + " with error:" +
2502                    couchstore_strerror(errCode));
2503        }
2504    } else {
2505        // open failed - map couchstore error code to exception.
2506        std::errc ec;
2507        switch (errCode) {
2508            case COUCHSTORE_ERROR_OPEN_FILE:
2509                ec = std::errc::no_such_file_or_directory; break;
2510            default:
2511                ec = std::errc::io_error; break;
2512        }
2513        throw std::system_error(std::make_error_code(ec),
2514                                "CouchKVStore::getNumPersistedDeletes:"
2515                                "Failed to open database file for vBucket = " +
2516                                        std::to_string(vbid) + " rev = " +
2517                                        std::to_string(db.getFileRev()) +
2518                                        " with error:" +
2519                                        couchstore_strerror(errCode));
2520    }
2521    return 0;
2522}
2523
2524DBFileInfo CouchKVStore::getDbFileInfo(uint16_t vbid) {
2525
2526    DbInfo info = getDbInfo(vbid);
2527    return DBFileInfo{info.file_size, info.space_used};
2528}
2529
2530DBFileInfo CouchKVStore::getAggrDbFileInfo() {
2531    DBFileInfo kvsFileInfo;
2532    /**
2533     * Iterate over all the vbuckets to get the total.
2534     * If the vbucket is dead, then its value would
2535     * be zero.
2536     */
2537    for (uint16_t vbid = 0; vbid < numDbFiles; vbid++) {
2538        kvsFileInfo.fileSize += cachedFileSize[vbid].load();
2539        kvsFileInfo.spaceUsed += cachedSpaceUsed[vbid].load();
2540    }
2541    return kvsFileInfo;
2542}
2543
2544size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
2545                                 uint64_t max_seq) {
2546    DbHolder db(*this);
2547    uint64_t count = 0;
2548    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
2549    if (errCode == COUCHSTORE_SUCCESS) {
2550        errCode = couchstore_changes_count(db, min_seq, max_seq, &count);
2551        if (errCode != COUCHSTORE_SUCCESS) {
2552            throw std::runtime_error(
2553                    "CouchKVStore::getNumItems: Failed to "
2554                    "get changes count for vBucket = " +
2555                    std::to_string(vbid) + " rev = " +
2556                    std::to_string(db.getFileRev()) + " with error:" +
2557                    couchstore_strerror(errCode));
2558        }
2559    } else {
2560        throw std::invalid_argument(
2561                "CouchKVStore::getNumItems: Failed to "
2562                "open database file for vBucket = " +
2563                std::to_string(vbid) + " rev = " +
2564                std::to_string(db.getFileRev()) + " with error:" +
2565                couchstore_strerror(errCode));
2566    }
2567    return count;
2568}
2569
2570size_t CouchKVStore::getItemCount(uint16_t vbid) {
2571    if (!isReadOnly()) {
2572        return cachedDocCount.at(vbid);
2573    }
2574    return getDbInfo(vbid).doc_count;
2575}
2576
2577RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
2578                                      std::shared_ptr<RollbackCB> cb) {
2579    DbHolder db(*this);
2580    DbInfo info;
2581    couchstore_error_t errCode;
2582
2583    // Open the vbucket's file and determine the latestSeqno persisted.
2584    errCode = openDB(vbid, db, (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY);
2585    std::stringstream dbFileName;
2586    dbFileName << dbname << "/" << vbid << ".couch." << db.getFileRev();
2587
2588    if (errCode == COUCHSTORE_SUCCESS) {
2589        errCode = couchstore_db_info(db, &info);
2590        if (errCode != COUCHSTORE_SUCCESS) {
2591            logger.log(EXTENSION_LOG_WARNING,
2592                       "CouchKVStore::rollback: couchstore_db_info error:%s, "
2593                       "name:%s", couchstore_strerror(errCode),
2594                       dbFileName.str().c_str());
2595            return RollbackResult(false, 0, 0, 0);
2596        }
2597    } else {
2598        logger.log(EXTENSION_LOG_WARNING,
2599                   "CouchKVStore::rollback: openDB error:%s, name:%s",
2600                   couchstore_strerror(errCode), dbFileName.str().c_str());
2601        return RollbackResult(false, 0, 0, 0);
2602    }
2603    uint64_t latestSeqno = info.last_sequence;
2604
2605    // Count how many updates are in the vbucket's file. We'll later compare
2606    // this with how many items must be discarded and hence decide if it is
2607    // better to discard everything and start from an empty vBucket.
2608    uint64_t totSeqCount = 0;
2609    errCode = couchstore_changes_count(db, 0, latestSeqno, &totSeqCount);
2610    if (errCode != COUCHSTORE_SUCCESS) {
2611        logger.log(EXTENSION_LOG_WARNING,
2612                   "CouchKVStore::rollback: "
2613                   "couchstore_changes_count(0, %" PRIu64
2614                   ") error:%s [%s], "
2615                   "vb:%" PRIu16 ", rev:%" PRIu64,
2616                   latestSeqno,
2617                   couchstore_strerror(errCode),
2618                   cb_strerror().c_str(),
2619                   vbid,
2620                   db.getFileRev());
2621        return RollbackResult(false, 0, 0, 0);
2622    }
2623
2624    // Open the vBucket file again; and search for a header which is
2625    // before the requested rollback point - the Rollback Header.
2626    DbHolder newdb(*this);
2627    errCode = openDB(vbid, newdb, 0);
2628    if (errCode != COUCHSTORE_SUCCESS) {
2629        logger.log(EXTENSION_LOG_WARNING,
2630                   "CouchKVStore::rollback: openDB#2 error:%s, name:%s",
2631                   couchstore_strerror(errCode), dbFileName.str().c_str());
2632        return RollbackResult(false, 0, 0, 0);
2633    }
2634
2635    while (info.last_sequence > rollbackSeqno) {
2636        errCode = couchstore_rewind_db_header(newdb);
2637        if (errCode != COUCHSTORE_SUCCESS) {
2638            // rewind_db_header cleans up (frees DB) on error; so
2639            // release db in DbHolder to prevent a double-free.
2640            newdb.releaseDb();
2641            logger.log(EXTENSION_LOG_WARNING,
2642                       "CouchKVStore::rollback: couchstore_rewind_db_header "
2643                       "error:%s [%s], vb:%" PRIu16 ", latestSeqno:%" PRIu64
2644                       ", rollbackSeqno:%" PRIu64,
2645                       couchstore_strerror(errCode), cb_strerror().c_str(),
2646                       vbid, latestSeqno, rollbackSeqno);
2647            //Reset the vbucket and send the entire snapshot,
2648            //as a previous header wasn't found.
2649            return RollbackResult(false, 0, 0, 0);
2650        }
2651        errCode = couchstore_db_info(newdb, &info);
2652        if (errCode != COUCHSTORE_SUCCESS) {
2653            logger.log(EXTENSION_LOG_WARNING,
2654                       "CouchKVStore::rollback: couchstore_db_info error:%s, "
2655                       "name:%s", couchstore_strerror(errCode),
2656                       dbFileName.str().c_str());
2657            return RollbackResult(false, 0, 0, 0);
2658        }
2659    }
2660
2661    // Count how many updates we need to discard to rollback to the Rollback
2662    // Header. If this is too many; then prefer to discard everything (than
2663    // have to patch up a large amount of in-memory data).
2664    uint64_t rollbackSeqCount = 0;
2665    errCode = couchstore_changes_count(
2666            db, info.last_sequence, latestSeqno, &rollbackSeqCount);
2667    if (errCode != COUCHSTORE_SUCCESS) {
2668        logger.log(EXTENSION_LOG_WARNING,
2669                   "CouchKVStore::rollback: "
2670                   "couchstore_changes_count#2(%" PRIu64 ", %" PRIu64
2671                   ") "
2672                   "error:%s [%s], vb:%" PRIu16 ", rev:%" PRIu64,
2673                   info.last_sequence,
2674                   latestSeqno,
2675                   couchstore_strerror(errCode),
2676                   cb_strerror().c_str(),
2677                   vbid,
2678                   db.getFileRev());
2679        return RollbackResult(false, 0, 0, 0);
2680    }
2681    if ((totSeqCount / 2) <= rollbackSeqCount) {
2682        //doresetVbucket flag set or rollback is greater than 50%,
2683        //reset the vbucket and send the entire snapshot
2684        return RollbackResult(false, 0, 0, 0);
2685    }
2686
2687    // We have decided to perform a rollback to the Rollback Header.
2688    // Iterate across the series of keys which have been updated /since/ the
2689    // Rollback Header; invoking a callback on each. This allows the caller to
2690    // then inspect the state of the given key in the Rollback Header, and
2691    // correct the in-memory view:
2692    // * If the key is not present in the Rollback header then delete it from
2693    //   the HashTable (if either didn't exist yet, or had previously been
2694    //   deleted in the Rollback header).
2695    // * If the key is present in the Rollback header then replace the in-memory
2696    // value with the value from the Rollback header.
2697    cb->setDbHeader(newdb);
2698    auto cl = std::make_shared<NoLookupCallback>();
2699    ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence+1,
2700                                       DocumentFilter::ALL_ITEMS,
2701                                       ValueFilter::KEYS_ONLY);
2702    scan_error_t error = scan(ctx);
2703    destroyScanContext(ctx);
2704
2705    if (error != scan_success) {
2706        return RollbackResult(false, 0, 0, 0);
2707    }
2708
2709    if (readVBState(newdb, vbid) != ENGINE_SUCCESS) {
2710        return RollbackResult(false, 0, 0, 0);
2711    }
2712    cachedDeleteCount[vbid] = info.deleted_count;
2713    cachedDocCount[vbid] = info.doc_count;
2714
2715    //Append the rewinded header to the database file
2716    errCode = couchstore_commit(newdb);
2717
2718    if (errCode != COUCHSTORE_SUCCESS) {
2719        return RollbackResult(false, 0, 0, 0);
2720    }
2721
2722    vbucket_state* vb_state = getVBucketState(vbid);
2723    return RollbackResult(true, vb_state->highSeqno,
2724                          vb_state->lastSnapStart, vb_state->lastSnapEnd);
2725}
2726
2727int populateAllKeys(Db *db, DocInfo *docinfo, void *ctx) {
2728    AllKeysCtx *allKeysCtx = (AllKeysCtx *)ctx;
2729    // Collections: TODO: Restore to stored namespace
2730    DocKey key = makeDocKey(docinfo->id, false /*restore namespace*/);
2731    (allKeysCtx->cb)->callback(key);
2732    if (--(allKeysCtx->count) <= 0) {
2733        //Only when count met is less than the actual number of entries
2734        return COUCHSTORE_ERROR_CANCEL;
2735    }
2736    return COUCHSTORE_SUCCESS;
2737}
2738
2739ENGINE_ERROR_CODE
2740CouchKVStore::getAllKeys(uint16_t vbid,
2741                         const DocKey start_key,
2742                         uint32_t count,
2743                         std::shared_ptr<Callback<const DocKey&>> cb) {
2744    DbHolder db(*this);
2745    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
2746    if(errCode == COUCHSTORE_SUCCESS) {
2747        sized_buf ref = {NULL, 0};
2748        ref.buf = (char*) start_key.data();
2749        ref.size = start_key.size();
2750        AllKeysCtx ctx(cb, count);
2751        errCode = couchstore_all_docs(db,
2752                                      &ref,
2753                                      COUCHSTORE_NO_DELETES,
2754                                      populateAllKeys,
2755                                      static_cast<void*>(&ctx));
2756        if (errCode == COUCHSTORE_SUCCESS ||
2757                errCode == COUCHSTORE_ERROR_CANCEL)  {
2758            return ENGINE_SUCCESS;
2759        } else {
2760            logger.log(EXTENSION_LOG_WARNING,
2761                       "CouchKVStore::getAllKeys: couchstore_all_docs "
2762                       "error:%s [%s] vb:%" PRIu16 ", rev:%" PRIu64,
2763                       couchstore_strerror(errCode),
2764                       cb_strerror().c_str(),
2765                       vbid,
2766                       db.getFileRev());
2767        }
2768    } else {
2769        logger.log(EXTENSION_LOG_WARNING,
2770                   "CouchKVStore::getAllKeys: openDB error:%s, vb:%" PRIu16
2771                   ", rev:%" PRIu64,
2772                   couchstore_strerror(errCode),
2773                   vbid,
2774                   db.getFileRev());
2775    }
2776    return ENGINE_FAILED;
2777}
2778
2779void CouchKVStore::unlinkCouchFile(uint16_t vbucket,
2780                                   uint64_t fRev) {
2781
2782    if (isReadOnly()) {
2783        throw std::logic_error("CouchKVStore::unlinkCouchFile: Not valid on a "
2784                "read-only object.");
2785    }
2786    char fname[PATH_MAX];
2787    try {
2788        checked_snprintf(fname, sizeof(fname), "%s/%d.couch.%" PRIu64,
2789                         dbname.c_str(), vbucket, fRev);
2790    } catch (std::exception&) {
2791        LOG(EXTENSION_LOG_WARNING,
2792            "CouchKVStore::unlinkCouchFile: Failed to build filename:%s",
2793            fname);
2794        return;
2795    }
2796
2797    if (remove(fname) == -1) {
2798        logger.log(EXTENSION_LOG_WARNING,
2799                   "CouchKVStore::unlinkCouchFile: remove error:%u, "
2800                   "vb:%" PRIu16 ", rev:%" PRIu64 ", fname:%s", errno, vbucket,
2801                   fRev, fname);
2802
2803        if (errno != ENOENT) {
2804            std::string file_str = fname;
2805            pendingFileDeletions.push(file_str);
2806        }
2807    }
2808}
2809
2810void CouchKVStore::removeCompactFile(const std::string& dbname, uint16_t vbid) {
2811    std::string dbfile = getDBFileName(dbname, vbid, (*dbFileRevMap)[vbid]);
2812    std::string compact_file = dbfile + ".compact";
2813
2814    if (!isReadOnly()) {
2815        removeCompactFile(compact_file);
2816    } else {
2817        logger.log(EXTENSION_LOG_WARNING,
2818                   "CouchKVStore::removeCompactFile: A read-only instance of "
2819                   "the underlying store was not allowed to delete a temporary"
2820                   "file: %s",
2821                   compact_file.c_str());
2822    }
2823}
2824
2825void CouchKVStore::removeCompactFile(const std::string &filename) {
2826    if (isReadOnly()) {
2827        throw std::logic_error("CouchKVStore::removeCompactFile: Not valid on "
2828                "a read-only object.");
2829    }
2830
2831    if (access(filename.c_str(), F_OK) == 0) {
2832        if (remove(filename.c_str()) == 0) {
2833            logger.log(EXTENSION_LOG_WARNING,
2834                       "CouchKVStore::removeCompactFile: Removed compact "
2835                       "filename:%s", filename.c_str());
2836        }
2837        else {
2838            logger.log(EXTENSION_LOG_WARNING,
2839                       "CouchKVStore::removeCompactFile: remove error:%s, "
2840                       "filename:%s", cb_strerror().c_str(), filename.c_str());
2841
2842            if (errno != ENOENT) {
2843                pendingFileDeletions.push(const_cast<std::string &>(filename));
2844            }
2845        }
2846    }
2847}
2848
2849std::string CouchKVStore::getCollectionsManifest(uint16_t vbid) {
2850    DbHolder db(*this);
2851
2852    // openDB logs error details
2853    couchstore_error_t errCode = openDB(vbid, db, COUCHSTORE_OPEN_FLAG_RDONLY);
2854    if (errCode != COUCHSTORE_SUCCESS) {
2855        return {};
2856    }
2857
2858    return readCollectionsManifest(*db);
2859}
2860
2861void CouchKVStore::incrementRevision(uint16_t vbid) {
2862    (*dbFileRevMap)[vbid]++;
2863}
2864
2865uint64_t CouchKVStore::prepareToDelete(uint16_t vbid) {
2866    // Clear the stats so it looks empty (real deletion of the disk data occurs
2867    // later)
2868    cachedDocCount[vbid] = 0;
2869    cachedDeleteCount[vbid] = 0;
2870    cachedFileSize[vbid] = 0;
2871    cachedSpaceUsed[vbid] = 0;
2872    return (*dbFileRevMap)[vbid];
2873}
2874
2875/* end of couch-kvstore.cc */
2876