1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#ifdef _MSC_VER
21#define PATH_MAX MAX_PATH
22#endif
23
24#include <fcntl.h>
25#include <stdio.h>
26#include <string.h>
27#include <sys/stat.h>
28#include <sys/types.h>
29
30#include <algorithm>
31#include <cctype>
32#include <cstdlib>
33#include <fstream>
34#include <iostream>
35#include <list>
36#include <map>
37#include <platform/cb_malloc.h>
38#include <platform/checked_snprintf.h>
39#include <string>
40#include <utility>
41#include <vector>
42#include <cJSON.h>
43#include <platform/dirutils.h>
44
45#include "common.h"
46#include "couch-kvstore/couch-kvstore.h"
47#define STATWRITER_NAMESPACE couchstore_engine
48#include "statwriter.h"
49#undef STATWRITER_NAMESPACE
50#include "vbucket.h"
51
52#include <JSON_checker.h>
53#include <snappy-c.h>
54
55using namespace CouchbaseDirectoryUtilities;
56
57static const int MAX_OPEN_DB_RETRY = 10;
58
59/*
60 * MetaData warning
61 * Sherlock began storing an extra byte of data (taking meta_len to 19 bytes).
62 * Watson (4.6) removes this byte as it is never utilised.
63 *
64 * WARNING: any *new* meta-data we wish to store may cause upgrade trouble if it
65 * takes the length to 19 bytes.
66 */
67static const uint32_t DEFAULT_META_LEN = 16;
68static const uint32_t V1_META_LEN = 18;
69#define UNUSED_V2_LEN 19
70
71extern "C" {
72    static int recordDbDumpC(Db *db, DocInfo *docinfo, void *ctx)
73    {
74        return CouchKVStore::recordDbDump(db, docinfo, ctx);
75    }
76}
77
78extern "C" {
79    static int getMultiCbC(Db *db, DocInfo *docinfo, void *ctx)
80    {
81        return CouchKVStore::getMultiCb(db, docinfo, ctx);
82    }
83}
84
85static std::string getStrError(Db *db) {
86    const size_t max_msg_len = 256;
87    char msg[max_msg_len];
88    couchstore_last_os_error(db, msg, max_msg_len);
89    std::string errorStr(msg);
90    return errorStr;
91}
92
93static uint8_t determine_datatype(const unsigned char* value,
94                                  size_t length) {
95    if (checkUTF8JSON(value, length)) {
96        return PROTOCOL_BINARY_DATATYPE_JSON;
97    } else {
98        return PROTOCOL_BINARY_RAW_BYTES;
99    }
100}
101
102static bool endWithCompact(const std::string &filename) {
103    size_t pos = filename.find(".compact");
104    if (pos == std::string::npos ||
105                        (filename.size() - sizeof(".compact")) != pos) {
106        return false;
107    }
108    return true;
109}
110
111static void discoverDbFiles(const std::string &dir,
112                            std::vector<std::string> &v) {
113    std::vector<std::string> files = findFilesContaining(dir, ".couch");
114    std::vector<std::string>::iterator ii;
115    for (ii = files.begin(); ii != files.end(); ++ii) {
116        if (!endWithCompact(*ii)) {
117            v.push_back(*ii);
118        }
119    }
120}
121
122static int getMutationStatus(couchstore_error_t errCode) {
123    switch (errCode) {
124    case COUCHSTORE_SUCCESS:
125        return MUTATION_SUCCESS;
126    case COUCHSTORE_ERROR_NO_HEADER:
127    case COUCHSTORE_ERROR_NO_SUCH_FILE:
128    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
129        // this return causes ep engine to drop the failed flush
130        // of an item since it does not know about the itme any longer
131        return DOC_NOT_FOUND;
132    default:
133        // this return causes ep engine to keep requeuing the failed
134        // flush of an item
135        return MUTATION_FAILED;
136    }
137}
138
139static bool allDigit(std::string &input) {
140    size_t numchar = input.length();
141    for(size_t i = 0; i < numchar; ++i) {
142        if (!isdigit(input[i])) {
143            return false;
144        }
145    }
146    return true;
147}
148
149static std::string couchkvstore_strerrno(Db *db, couchstore_error_t err) {
150    return (err == COUCHSTORE_ERROR_OPEN_FILE ||
151            err == COUCHSTORE_ERROR_READ ||
152            err == COUCHSTORE_ERROR_WRITE) ? getStrError(db) : "none";
153}
154
155struct GetMultiCbCtx {
156    GetMultiCbCtx(CouchKVStore &c, uint16_t v, vb_bgfetch_queue_t &f) :
157        cks(c), vbId(v), fetches(f) {}
158
159    CouchKVStore &cks;
160    uint16_t vbId;
161    vb_bgfetch_queue_t &fetches;
162};
163
164struct StatResponseCtx {
165public:
166    StatResponseCtx(std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &sm,
167                    uint16_t vb) : statMap(sm), vbId(vb) {
168        /* EMPTY */
169    }
170
171    std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &statMap;
172    uint16_t vbId;
173};
174
175struct AllKeysCtx {
176    AllKeysCtx(std::shared_ptr<Callback<uint16_t&, char*&> > callback, uint32_t cnt)
177        : cb(callback), count(cnt) { }
178
179    std::shared_ptr<Callback<uint16_t&, char*&> > cb;
180    uint32_t count;
181};
182
183CouchRequest::CouchRequest(const Item &it, uint64_t rev,
184                           MutationRequestCallback &cb, bool del)
185    : IORequest(it.getVBucketId(), cb, del, it.getKey()), value(it.getValue()),
186      fileRevNum(rev)
187{
188    uint64_t cas = htonll(it.getCas());
189    uint32_t flags = it.getFlags();
190    uint32_t vlen = it.getNBytes();
191    uint32_t exptime = it.getExptime();
192
193    // Datatype used to determine whether document requires compression or not
194    uint8_t datatype;
195
196    // Save time of deletion in expiry time field of deleted item's metadata.
197    if (del) {
198        exptime = ep_real_time();
199    }
200    exptime = htonl(exptime);
201
202    dbDoc.id.buf = const_cast<char *>(key.c_str());
203    dbDoc.id.size = it.getNKey();
204    if (vlen) {
205        dbDoc.data.buf = const_cast<char *>(value->getData());
206        dbDoc.data.size = vlen;
207        datatype = it.getDataType();
208    } else {
209        dbDoc.data.buf = NULL;
210        dbDoc.data.size = 0;
211        datatype = 0x00;
212    }
213
214    memset(meta, 0, sizeof(meta));
215    memcpy(meta, &cas, 8);
216    memcpy(meta + 8, &exptime, 4);
217    memcpy(meta + 12, &flags, 4);
218    *(meta + DEFAULT_META_LEN) = FLEX_META_CODE;
219
220    //For a deleted item, there is no extended meta data available
221    //as part of the item object, hence by default populate the
222    //data type to PROTOCOL_BINARY_RAW_BYTES
223    if (del) {
224        uint8_t del_datatype = PROTOCOL_BINARY_RAW_BYTES;
225        memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
226               &del_datatype, sizeof(uint8_t));
227    } else {
228        memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET, it.getExtMeta(),
229               it.getExtMetaLen());
230    }
231
232    dbDocInfo.db_seq = it.getBySeqno();
233    dbDocInfo.rev_meta.buf = reinterpret_cast<char *>(meta);
234    dbDocInfo.rev_meta.size = COUCHSTORE_METADATA_SIZE;
235    dbDocInfo.rev_seq = it.getRevSeqno();
236    dbDocInfo.size = dbDoc.data.size;
237    if (del) {
238        dbDocInfo.deleted =  1;
239    } else {
240        dbDocInfo.deleted = 0;
241    }
242    dbDocInfo.id = dbDoc.id;
243    dbDocInfo.content_meta = (datatype == PROTOCOL_BINARY_DATATYPE_JSON) ?
244                                    COUCH_DOC_IS_JSON : COUCH_DOC_NON_JSON_MODE;
245
246    //Compress only those documents that aren't already compressed.
247    if (dbDoc.data.size > 0 && !deleteItem) {
248        if (datatype == PROTOCOL_BINARY_RAW_BYTES ||
249                datatype == PROTOCOL_BINARY_DATATYPE_JSON) {
250            dbDocInfo.content_meta |= COUCH_DOC_IS_COMPRESSED;
251        }
252    }
253}
254
255CouchKVStore::CouchKVStore(KVStoreConfig &config, bool read_only) :
256    KVStore(config, read_only), dbname(config.getDBName()),
257    intransaction(false), backfillCounter(0)
258{
259    createDataDir(dbname);
260    statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
261    statCollectingFileOpsCompaction = getCouchstoreStatsOps(&st.fsStatsCompaction);
262
263    // init db file map with default revision number, 1
264    numDbFiles = configuration.getMaxVBuckets();
265    cachedVBStates.reserve(numDbFiles);
266
267    // pre-allocate lookup maps (vectors) given we have a relatively
268    // small, fixed number of vBuckets.
269    dbFileRevMap.assign(numDbFiles, Couchbase::RelaxedAtomic<uint64_t>(1));
270    cachedDocCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(-1));
271    cachedDeleteCount.assign(numDbFiles, Couchbase::RelaxedAtomic<size_t>(-1));
272    cachedVBStates.assign(numDbFiles, nullptr);
273
274    initialize();
275}
276
277CouchKVStore::CouchKVStore(const CouchKVStore &copyFrom) :
278    KVStore(copyFrom), dbname(copyFrom.dbname),
279    dbFileRevMap(copyFrom.dbFileRevMap), numDbFiles(copyFrom.numDbFiles),
280    intransaction(false)
281{
282    createDataDir(dbname);
283    statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
284}
285
286void CouchKVStore::initialize() {
287    std::vector<uint16_t> vbids;
288    std::vector<std::string> files;
289    discoverDbFiles(dbname, files);
290    populateFileNameMap(files, &vbids);
291
292    Db *db = NULL;
293    couchstore_error_t errorCode;
294
295    std::vector<uint16_t>::iterator itr = vbids.begin();
296    for (; itr != vbids.end(); ++itr) {
297        uint16_t id = *itr;
298        uint64_t rev = dbFileRevMap[id];
299
300        errorCode = openDB(id, rev, &db, COUCHSTORE_OPEN_FLAG_RDONLY);
301        if (errorCode == COUCHSTORE_SUCCESS) {
302            readVBState(db, id);
303            /* update stat */
304            ++st.numLoadedVb;
305            closeDatabaseHandle(db);
306        } else {
307            LOG(EXTENSION_LOG_WARNING, "Failed to open database file "
308                "%s/%" PRIu16 ".couch.%" PRIu64, dbname.c_str(), id, rev);
309            remVBucketFromDbFileMap(id);
310            cachedVBStates[id] = NULL;
311        }
312
313        db = NULL;
314        if (!isReadOnly()) {
315            removeCompactFile(dbname, id, rev);
316        }
317    }
318}
319
320CouchKVStore::~CouchKVStore() {
321    close();
322
323    for (std::vector<vbucket_state *>::iterator it = cachedVBStates.begin();
324         it != cachedVBStates.end(); it++) {
325        vbucket_state *vbstate = *it;
326        if (vbstate) {
327            delete vbstate;
328            *it = NULL;
329        }
330    }
331}
332void CouchKVStore::reset(uint16_t vbucketId) {
333    if (isReadOnly()) {
334        throw std::logic_error("CouchKVStore::reset: Not valid on a read-only "
335                        "object.");
336    }
337
338    vbucket_state *state = cachedVBStates[vbucketId];
339    if (state) {
340        state->reset();
341
342        cachedDocCount[vbucketId] = 0;
343        cachedDeleteCount[vbucketId] = 0;
344
345        //Unlink the couchstore file upon reset
346        unlinkCouchFile(vbucketId, dbFileRevMap[vbucketId]);
347        setVBucketState(vbucketId, *state, NULL, true);
348        updateDbFileMap(vbucketId, 1);
349    } else {
350        throw std::invalid_argument("CouchKVStore::reset: No entry in cached "
351                        "states for vbucket " + std::to_string(vbucketId));
352    }
353}
354
355void CouchKVStore::set(const Item &itm, Callback<mutation_result> &cb) {
356    if (isReadOnly()) {
357        throw std::logic_error("CouchKVStore::set: Not valid on a read-only "
358                        "object.");
359    }
360    if (!intransaction) {
361        throw std::invalid_argument("CouchKVStore::set: intransaction must be "
362                        "true to perform a set operation.");
363    }
364
365    bool deleteItem = false;
366    MutationRequestCallback requestcb;
367    uint64_t fileRev = dbFileRevMap[itm.getVBucketId()];
368
369    // each req will be de-allocated after commit
370    requestcb.setCb = &cb;
371    CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, deleteItem);
372    pendingReqsQ.push_back(req);
373}
374
375void CouchKVStore::get(const std::string &key, uint16_t vb,
376                       Callback<GetValue> &cb, bool fetchDelete) {
377    Db *db = NULL;
378    GetValue rv;
379    uint64_t fileRev = dbFileRevMap[vb];
380
381    couchstore_error_t errCode = openDB(vb, fileRev, &db,
382                                        COUCHSTORE_OPEN_FLAG_RDONLY);
383    if (errCode != COUCHSTORE_SUCCESS) {
384        ++st.numGetFailure;
385        LOG(EXTENSION_LOG_WARNING,
386            "Failed to open database to retrieve data "
387            "from vBucketId = %d, key = %s\n",
388            vb, key.c_str());
389        rv.setStatus(couchErr2EngineErr(errCode));
390        cb.callback(rv);
391        return;
392    }
393
394    getWithHeader(db, key, vb, cb, fetchDelete);
395    closeDatabaseHandle(db);
396}
397
398void CouchKVStore::getWithHeader(void *dbHandle, const std::string &key,
399                                 uint16_t vb, Callback<GetValue> &cb,
400                                 bool fetchDelete) {
401
402    Db *db = (Db *)dbHandle;
403    hrtime_t start = gethrtime();
404    RememberingCallback<GetValue> *rc = dynamic_cast<RememberingCallback<GetValue> *>(&cb);
405    bool getMetaOnly = rc && rc->val.isPartial();
406    DocInfo *docInfo = NULL;
407    sized_buf id;
408    GetValue rv;
409
410    id.size = key.size();
411    id.buf = const_cast<char *>(key.c_str());
412
413    couchstore_error_t errCode = couchstore_docinfo_by_id(db, (uint8_t *)id.buf,
414                                                          id.size, &docInfo);
415    if (errCode != COUCHSTORE_SUCCESS) {
416        if (!getMetaOnly) {
417            // log error only if this is non-xdcr case
418            LOG(EXTENSION_LOG_WARNING,
419                "Failed to retrieve doc info from "
420                "database, vbucketId=%d, key=%s error=%s [%s]\n",
421                vb, id.buf, couchstore_strerror(errCode),
422                couchkvstore_strerrno(db, errCode).c_str());
423        }
424    } else {
425        if (docInfo == nullptr) {
426            throw std::logic_error("CouchKVStore::getWithHeader: "
427                    "couchstore_docinfo_by_id returned success but docInfo "
428                    "is NULL");
429        }
430        errCode = fetchDoc(db, docInfo, rv, vb, getMetaOnly, fetchDelete);
431        if (errCode != COUCHSTORE_SUCCESS) {
432            LOG(EXTENSION_LOG_WARNING,
433                "Failed to retrieve key value from "
434                "database, vbucketId=%d key=%s error=%s [%s] "
435                "deleted=%s", vb, id.buf,
436                couchstore_strerror(errCode),
437                couchkvstore_strerrno(db, errCode).c_str(),
438                docInfo->deleted ? "yes" : "no");
439        }
440
441        // record stats
442        st.readTimeHisto.add((gethrtime() - start) / 1000);
443        if (errCode == COUCHSTORE_SUCCESS) {
444            st.readSizeHisto.add(key.length() + rv.getValue()->getNBytes());
445        }
446    }
447
448    if(errCode != COUCHSTORE_SUCCESS) {
449        ++st.numGetFailure;
450    }
451
452    couchstore_free_docinfo(docInfo);
453    rv.setStatus(couchErr2EngineErr(errCode));
454    cb.callback(rv);
455}
456
457void CouchKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t &itms) {
458    int numItems = itms.size();
459    uint64_t fileRev = dbFileRevMap[vb];
460
461    Db *db = NULL;
462    couchstore_error_t errCode = openDB(vb, fileRev, &db,
463                                        COUCHSTORE_OPEN_FLAG_RDONLY);
464    if (errCode != COUCHSTORE_SUCCESS) {
465        LOG(EXTENSION_LOG_WARNING,
466            "Failed to open database for data fetch, "
467            "vBucketId = %" PRIu16 ", numDocs = %d\n",
468            vb, numItems);
469        st.numGetFailure.fetch_add(numItems);
470        vb_bgfetch_queue_t::iterator itr = itms.begin();
471        for (; itr != itms.end(); ++itr) {
472            vb_bgfetch_item_ctx_t &bg_itm_ctx = (*itr).second;
473            std::list<VBucketBGFetchItem *> &fetches = bg_itm_ctx.bgfetched_list;
474            std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
475            for (; fitr != fetches.end(); ++fitr) {
476                (*fitr)->value.setStatus(ENGINE_NOT_MY_VBUCKET);
477            }
478        }
479        return;
480    }
481
482    size_t idx = 0;
483    sized_buf *ids = new sized_buf[itms.size()];
484    vb_bgfetch_queue_t::iterator itr = itms.begin();
485    for (; itr != itms.end(); ++itr) {
486        ids[idx].size = itr->first.size();
487        ids[idx].buf = const_cast<char *>(itr->first.c_str());
488        ++idx;
489    }
490
491    GetMultiCbCtx ctx(*this, vb, itms);
492
493    errCode = couchstore_docinfos_by_id(db, ids, itms.size(),
494                                        0, getMultiCbC, &ctx);
495    if (errCode != COUCHSTORE_SUCCESS) {
496        st.numGetFailure.fetch_add(numItems);
497        for (itr = itms.begin(); itr != itms.end(); ++itr) {
498            LOG(EXTENSION_LOG_WARNING, "Failed to read database by"
499                " vBucketId = %" PRIu16 " key = %s error = %s [%s]\n",
500                vb, (*itr).first.c_str(),
501                couchstore_strerror(errCode),
502                couchkvstore_strerrno(db, errCode).c_str());
503            vb_bgfetch_item_ctx_t &bg_itm_ctx = (*itr).second;
504            std::list<VBucketBGFetchItem *> &fetches = bg_itm_ctx.bgfetched_list;
505            std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
506            for (; fitr != fetches.end(); ++fitr) {
507                (*fitr)->value.setStatus(couchErr2EngineErr(errCode));
508            }
509        }
510    }
511    closeDatabaseHandle(db);
512    delete []ids;
513}
514
515void CouchKVStore::del(const Item &itm,
516                       Callback<int> &cb) {
517    if (isReadOnly()) {
518        throw std::logic_error("CouchKVStore::del: Not valid on a read-only "
519                        "object.");
520    }
521    if (!intransaction) {
522        throw std::invalid_argument("CouchKVStore::del: intransaction must be "
523                        "true to perform a delete operation.");
524    }
525
526    uint64_t fileRev = dbFileRevMap[itm.getVBucketId()];
527    MutationRequestCallback requestcb;
528    requestcb.delCb = &cb;
529    CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, true);
530    pendingReqsQ.push_back(req);
531}
532
533void CouchKVStore::delVBucket(uint16_t vbucket) {
534    if (isReadOnly()) {
535        throw std::logic_error("CouchKVStore::delVBucket: Not valid on a "
536                        "read-only object.");
537    }
538
539    unlinkCouchFile(vbucket, dbFileRevMap[vbucket]);
540
541    if (cachedVBStates[vbucket]) {
542        delete cachedVBStates[vbucket];
543    }
544
545    std::string failovers("[{\"id\":0, \"seq\":0}]");
546    cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
547                                                0, 0, 0, failovers);
548    updateDbFileMap(vbucket, 1);
549}
550
551std::vector<vbucket_state *> CouchKVStore::listPersistedVbuckets() {
552    return cachedVBStates;
553}
554
555void CouchKVStore::getPersistedStats(std::map<std::string,
556                                     std::string> &stats) {
557    char *buffer = NULL;
558    std::string fname = dbname + "/stats.json";
559    if (access(fname.c_str(), R_OK) == -1) {
560        return ;
561    }
562
563    std::ifstream session_stats;
564    session_stats.exceptions (session_stats.failbit | session_stats.badbit);
565    try {
566        session_stats.open(fname.c_str(), std::ios::binary);
567        session_stats.seekg(0, std::ios::end);
568        int flen = session_stats.tellg();
569        if (flen < 0) {
570            LOG(EXTENSION_LOG_WARNING,
571                "Error in session stats ifstream!!!");
572            session_stats.close();
573            return;
574        }
575        session_stats.seekg(0, std::ios::beg);
576        buffer = new char[flen + 1];
577        session_stats.read(buffer, flen);
578        session_stats.close();
579        buffer[flen] = '\0';
580
581        cJSON *json_obj = cJSON_Parse(buffer);
582        if (!json_obj) {
583            LOG(EXTENSION_LOG_WARNING,
584                "Failed to parse the session stats json doc!!!");
585            delete[] buffer;
586            return;
587        }
588
589        int json_arr_size = cJSON_GetArraySize(json_obj);
590        for (int i = 0; i < json_arr_size; ++i) {
591            cJSON *obj = cJSON_GetArrayItem(json_obj, i);
592            if (obj) {
593                stats[obj->string] = obj->valuestring ? obj->valuestring : "";
594            }
595        }
596        cJSON_Delete(json_obj);
597
598    } catch (const std::ifstream::failure &e) {
599        LOG(EXTENSION_LOG_WARNING,
600            "Failed to load the engine session stats "
601            "due to IO exception \"%s\"", e.what());
602    } catch (...) {
603        LOG(EXTENSION_LOG_WARNING,
604            "Failed to load the engine session stats "
605            "due to IO exception");
606    }
607
608    delete[] buffer;
609}
610
611static std::string getDBFileName(const std::string &dbname,
612                                 uint16_t vbid,
613                                 uint64_t rev) {
614    std::stringstream ss;
615    ss << dbname << "/" << vbid << ".couch." << rev;
616    return ss.str();
617}
618
619static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
620    if ((*info)->rev_meta.size == DEFAULT_META_LEN) {
621        // Metadata doesn't have flex_meta_code and datatype so provision space
622        // for these paramenters.
623        const unsigned char* data;
624        bool ret;
625        if (((*info)->content_meta | COUCH_DOC_IS_COMPRESSED) ==
626                (*info)->content_meta) {
627            size_t uncompr_len;
628            snappy_uncompressed_length(item->buf, item->size, &uncompr_len);
629            char *dbuf = (char *) cb_malloc(uncompr_len);
630            snappy_uncompress(item->buf, item->size, dbuf, &uncompr_len);
631            data = (const unsigned char*)dbuf;
632            ret = checkUTF8JSON(data, uncompr_len);
633            cb_free(dbuf);
634        } else {
635            data = (const unsigned char*)item->buf;
636            ret = checkUTF8JSON(data, item->size);
637        }
638        uint8_t flex_code = FLEX_META_CODE;
639        uint8_t datatype;
640        if (ret) {
641            datatype = PROTOCOL_BINARY_DATATYPE_JSON;
642        } else {
643            datatype = PROTOCOL_BINARY_RAW_BYTES;
644        }
645
646        DocInfo *docinfo = (DocInfo *) cb_calloc(1,
647                                               sizeof(DocInfo) +
648                                               (*info)->id.size +
649                                               (*info)->rev_meta.size +
650                                               FLEX_DATA_OFFSET + EXT_META_LEN +
651                                               sizeof(uint8_t));
652        if (!docinfo) {
653            LOG(EXTENSION_LOG_WARNING, "Failed to allocate docInfo, "
654                    "while editing docinfo in the compaction's docinfo_hook");
655            return 0;
656        }
657
658        char *extra = (char *)docinfo + sizeof(DocInfo);
659        memcpy(extra, (*info)->id.buf, (*info)->id.size);
660        docinfo->id.buf = extra;
661        docinfo->id.size = (*info)->id.size;
662
663        extra += (*info)->id.size;
664        memcpy(extra, (*info)->rev_meta.buf, (*info)->rev_meta.size);
665        memcpy(extra + (*info)->rev_meta.size,
666               &flex_code, FLEX_DATA_OFFSET);
667        memcpy(extra + (*info)->rev_meta.size + FLEX_DATA_OFFSET,
668               &datatype, sizeof(uint8_t));
669        docinfo->rev_meta.buf = extra;
670        docinfo->rev_meta.size = (*info)->rev_meta.size +
671                                 FLEX_DATA_OFFSET + EXT_META_LEN +
672                                 sizeof(uint8_t);
673
674        docinfo->db_seq = (*info)->db_seq;
675        docinfo->rev_seq = (*info)->rev_seq;
676        docinfo->deleted = (*info)->deleted;
677        docinfo->content_meta = (*info)->content_meta;
678        docinfo->bp = (*info)->bp;
679        docinfo->size = (*info)->size;
680
681        couchstore_free_docinfo(*info);
682        *info = docinfo;
683        return 1;
684    }
685    return 0;
686}
687
688static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
689    compaction_ctx* ctx = (compaction_ctx*) ctx_p;
690    DbInfo infoDb;
691
692    couchstore_db_info(d, &infoDb);
693    //Compaction finished
694    if (info == NULL) {
695        return couchstore_set_purge_seq(d, ctx->max_purged_seq);
696    }
697
698    if (info->rev_meta.size >= DEFAULT_META_LEN) {
699        uint32_t exptime;
700        memcpy(&exptime, info->rev_meta.buf + 8, 4);
701        exptime = ntohl(exptime);
702        if (info->deleted) {
703            if (info->db_seq != infoDb.last_sequence) {
704                if (ctx->drop_deletes) { // all deleted items must be dropped ...
705                    if (ctx->max_purged_seq < info->db_seq) {
706                        ctx->max_purged_seq = info->db_seq; // track max_purged_seq
707                    }
708                    return COUCHSTORE_COMPACT_DROP_ITEM;      // ...unconditionally
709                }
710                if (exptime < ctx->purge_before_ts &&
711                        (!ctx->purge_before_seq ||
712                         info->db_seq <= ctx->purge_before_seq)) {
713                    if (ctx->max_purged_seq < info->db_seq) {
714                        ctx->max_purged_seq = info->db_seq;
715                    }
716                    return COUCHSTORE_COMPACT_DROP_ITEM;
717                }
718            }
719        } else if (exptime && exptime < ctx->curr_time) {
720            std::string key(info->id.buf, info->id.size);
721            ctx->expiryCallback->callback(key, info->rev_seq);
722        }
723    }
724
725    if (ctx->bloomFilterCallback) {
726        bool deleted = info->deleted;
727        std::string key((const char *)info->id.buf, info->id.size);
728        ctx->bloomFilterCallback->callback(key, deleted);
729    }
730
731    return COUCHSTORE_COMPACT_KEEP_ITEM;
732}
733
734bool CouchKVStore::compactDB(compaction_ctx *hook_ctx,
735                             Callback<kvstats_ctx> &kvcb) {
736    if (isReadOnly()) {
737        throw std::logic_error("CouchKVStore::compactDB: Cannot perform "
738                        "on a read-only instance.");
739    }
740
741    couchstore_compact_hook       hook = time_purge_hook;
742    couchstore_docinfo_hook      dhook = edit_docinfo_hook;
743    const couch_file_ops     *def_iops = &statCollectingFileOpsCompaction;
744    Db                      *compactdb = NULL;
745    Db                       *targetDb = NULL;
746    couchstore_error_t         errCode = COUCHSTORE_SUCCESS;
747    hrtime_t                     start = gethrtime();
748    std::string                 dbfile;
749    std::string           compact_file;
750    std::string               new_file;
751    kvstats_ctx                  kvctx;
752    DbInfo                        info;
753    uint16_t                      vbid = hook_ctx->db_file_id;
754    uint64_t                   fileRev = dbFileRevMap[vbid];
755    uint64_t                   new_rev = fileRev + 1;
756
757    // Open the source VBucket database file ...
758    errCode = openDB(vbid, fileRev, &compactdb,
759                     (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, nullptr, false,
760                     def_iops);
761    if (errCode != COUCHSTORE_SUCCESS) {
762        LOG(EXTENSION_LOG_WARNING,
763                "Failed to open database, vbucketId = %d "
764                "fileRev = %" PRIu64, vbid, fileRev);
765        return false;
766    }
767
768    // Build the temporary vbucket.compact file name
769    dbfile       = getDBFileName(dbname, vbid, fileRev);
770    compact_file = dbfile + ".compact";
771
772    // Perform COMPACTION of vbucket.couch.rev into vbucket.couch.rev.compact
773    errCode = couchstore_compact_db_ex(compactdb, compact_file.c_str(),
774                                       COUCHSTORE_COMPACT_FLAG_UPGRADE_DB,
775                                       hook, dhook, hook_ctx, def_iops);
776    if (errCode != COUCHSTORE_SUCCESS) {
777        LOG(EXTENSION_LOG_WARNING,
778            "Failed to compact database with name=%s "
779            "error=%s errno=%s",
780            dbfile.c_str(),
781            couchstore_strerror(errCode),
782            couchkvstore_strerrno(compactdb, errCode).c_str());
783        closeDatabaseHandle(compactdb);
784        return false;
785    }
786
787    // Close the source Database File once compaction is done
788    closeDatabaseHandle(compactdb);
789
790    // Rename the .compact file to one with the next revision number
791    new_file = getDBFileName(dbname, vbid, new_rev);
792    if (rename(compact_file.c_str(), new_file.c_str()) != 0) {
793        LOG(EXTENSION_LOG_WARNING,
794            "Failed to rename '%s' to '%s': %s",
795            compact_file.c_str(), new_file.c_str(),
796            cb_strerror().c_str());
797
798        removeCompactFile(compact_file);
799        return false;
800    }
801
802    // Open the newly compacted VBucket database file ...
803    errCode = openDB(vbid, new_rev, &targetDb,
804                     (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
805    if (errCode != COUCHSTORE_SUCCESS) {
806        LOG(EXTENSION_LOG_WARNING,
807                "Failed to open compacted database file %s "
808                "fileRev = %" PRIu64, new_file.c_str(), new_rev);
809        if (remove(new_file.c_str()) != 0) {
810            LOG(EXTENSION_LOG_WARNING,
811                "Warning: Failed to remove '%s': %s",
812                new_file.c_str(), cb_strerror().c_str());
813        }
814        return false;
815    }
816
817    // Update the global VBucket file map so all operations use the new file
818    updateDbFileMap(vbid, new_rev);
819
820    LOG(EXTENSION_LOG_INFO,
821            "INFO: created new couch db file, name=%s rev=%" PRIu64,
822            new_file.c_str(), new_rev);
823
824    // Update stats to caller
825    kvctx.vbucket = vbid;
826    couchstore_db_info(targetDb, &info);
827    kvctx.fileSpaceUsed = info.space_used;
828    kvctx.fileSize = info.file_size;
829    kvcb.callback(kvctx);
830
831    // also update cached state with dbinfo
832    vbucket_state *state = cachedVBStates[vbid];
833    if (state) {
834        state->highSeqno = info.last_sequence;
835        state->purgeSeqno = info.purge_seq;
836        cachedDeleteCount[vbid] = info.deleted_count;
837        cachedDocCount[vbid] = info.doc_count;
838    }
839
840    closeDatabaseHandle(targetDb);
841
842    // Removing the stale couch file
843    unlinkCouchFile(vbid, fileRev);
844
845    st.compactHisto.add((gethrtime() - start) / 1000);
846
847    return true;
848}
849
850vbucket_state * CouchKVStore::getVBucketState(uint16_t vbucketId) {
851    return cachedVBStates[vbucketId];
852}
853
854bool CouchKVStore::setVBucketState(uint16_t vbucketId,
855                                   const vbucket_state &vbstate,
856                                   Callback<kvstats_ctx> *kvcb, bool reset) {
857    Db *db = NULL;
858    uint64_t fileRev, newFileRev;
859    std::stringstream id, rev;
860    std::string dbFileName;
861    std::map<uint16_t, uint64_t>::iterator mapItr;
862    kvstats_ctx kvctx;
863    kvctx.vbucket = vbucketId;
864
865    id << vbucketId;
866    fileRev = dbFileRevMap[vbucketId];
867    rev << fileRev;
868    dbFileName = dbname + "/" + id.str() + ".couch." + rev.str();
869
870    couchstore_error_t errorCode;
871    errorCode = openDB(vbucketId, fileRev, &db,
872            (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE, &newFileRev, reset);
873    if (errorCode != COUCHSTORE_SUCCESS) {
874        ++st.numVbSetFailure;
875        LOG(EXTENSION_LOG_WARNING,
876                "Failed to open database, name=%s",
877                dbFileName.c_str());
878        return false;
879    }
880
881    fileRev = newFileRev;
882    rev << fileRev;
883    dbFileName = dbname + "/" + id.str() + ".couch." + rev.str();
884
885    errorCode = saveVBState(db, vbstate);
886    if (errorCode != COUCHSTORE_SUCCESS) {
887        ++st.numVbSetFailure;
888        LOG(EXTENSION_LOG_WARNING,
889                "Failed to save local doc, name=%s",
890                dbFileName.c_str());
891        closeDatabaseHandle(db);
892        return false;
893    }
894
895    errorCode = couchstore_commit(db);
896    if (errorCode != COUCHSTORE_SUCCESS) {
897        ++st.numVbSetFailure;
898        LOG(EXTENSION_LOG_WARNING,
899                "Commit failed, vbid=%u rev=%" PRIu64 " error=%s [%s]",
900                vbucketId, fileRev, couchstore_strerror(errorCode),
901                couchkvstore_strerrno(db, errorCode).c_str());
902        closeDatabaseHandle(db);
903        return false;
904    }
905    if (kvcb) {
906        DbInfo info;
907        couchstore_db_info(db, &info);
908        kvctx.fileSpaceUsed = info.space_used;
909        kvctx.fileSize = info.file_size;
910        kvcb->callback(kvctx);
911    }
912    closeDatabaseHandle(db);
913
914    return true;
915}
916
917bool CouchKVStore::snapshotVBucket(uint16_t vbucketId,
918                                   const vbucket_state &vbstate,
919                                   Callback<kvstats_ctx> *cb, bool persist) {
920    if (isReadOnly()) {
921        LOG(EXTENSION_LOG_WARNING,
922            "Snapshotting a vbucket cannot be performed on a read-only "
923            "KVStore instance");
924        return false;
925    }
926
927    hrtime_t start = gethrtime();
928
929    if (updateCachedVBState(vbucketId, vbstate) && persist) {
930        vbucket_state *vbs = cachedVBStates[vbucketId];
931        if (!setVBucketState(vbucketId, *vbs, cb)) {
932            LOG(EXTENSION_LOG_WARNING,
933                "Failed to persist new state, %s, for vbucket %d\n",
934                VBucket::toString(vbstate.state), vbucketId);
935           return false;
936        }
937    }
938
939    LOG(EXTENSION_LOG_DEBUG,
940        "CouchKVStore::snapshotVBucket: Snapshotted vbucket:%" PRIu16 " state:%s",
941        vbucketId,
942        vbstate.toJSON().c_str());
943
944    st.snapshotHisto.add((gethrtime() - start) / 1000);
945
946    return true;
947}
948
949StorageProperties CouchKVStore::getStorageProperties() {
950    StorageProperties rv(true, true, true, true);
951    return rv;
952}
953
954bool CouchKVStore::commit(Callback<kvstats_ctx> *cb) {
955    if (isReadOnly()) {
956        throw std::logic_error("CouchKVStore::commit: Not valid on a read-only "
957                        "object.");
958    }
959
960    if (intransaction) {
961        if (commit2couchstore(cb)) {
962            intransaction = false;
963        }
964    }
965
966    return !intransaction;
967}
968
969void CouchKVStore::addStats(const std::string &prefix,
970                            ADD_STAT add_stat,
971                            const void *c) {
972    const char *prefix_str = prefix.c_str();
973
974    /* stats for both read-only and read-write threads */
975    addStat(prefix_str, "backend_type",   "couchstore",       add_stat, c);
976    addStat(prefix_str, "open",           st.numOpen,         add_stat, c);
977    addStat(prefix_str, "close",          st.numClose,        add_stat, c);
978    addStat(prefix_str, "readTime",       st.readTimeHisto,   add_stat, c);
979    addStat(prefix_str, "readSize",       st.readSizeHisto,   add_stat, c);
980    addStat(prefix_str, "numLoadedVb",    st.numLoadedVb,     add_stat, c);
981
982    // failure stats
983    addStat(prefix_str, "failure_open",   st.numOpenFailure, add_stat, c);
984    addStat(prefix_str, "failure_get",    st.numGetFailure,  add_stat, c);
985
986    if (!isReadOnly()) {
987        addStat(prefix_str, "failure_set",   st.numSetFailure,   add_stat, c);
988        addStat(prefix_str, "failure_del",   st.numDelFailure,   add_stat, c);
989        addStat(prefix_str, "failure_vbset", st.numVbSetFailure, add_stat, c);
990        addStat(prefix_str, "lastCommDocs",  st.docsCommitted,   add_stat, c);
991    }
992
993    addStat(prefix_str, "io_num_read", st.io_num_read, add_stat, c);
994    addStat(prefix_str, "io_num_write", st.io_num_write, add_stat, c);
995    addStat(prefix_str, "io_read_bytes", st.io_read_bytes, add_stat, c);
996    addStat(prefix_str, "io_write_bytes", st.io_write_bytes, add_stat, c);
997
998    const size_t read = st.fsStats.totalBytesRead.load() +
999                        st.fsStatsCompaction.totalBytesRead.load();
1000    addStat(prefix_str, "io_total_read_bytes", read, add_stat, c);
1001
1002    const size_t written = st.fsStats.totalBytesWritten.load() +
1003                           st.fsStatsCompaction.totalBytesWritten.load();
1004    addStat(prefix_str, "io_total_write_bytes", written, add_stat, c);
1005
1006    addStat(prefix_str, "io_compaction_read_bytes",
1007            st.fsStatsCompaction.totalBytesRead, add_stat, c);
1008    addStat(prefix_str, "io_compaction_write_bytes",
1009            st.fsStatsCompaction.totalBytesWritten, add_stat, c);
1010}
1011
1012void CouchKVStore::addTimingStats(const std::string &prefix,
1013                                  ADD_STAT add_stat, const void *c) {
1014    const char *prefix_str = prefix.c_str();
1015    addStat(prefix_str, "commit",      st.commitHisto,      add_stat, c);
1016    addStat(prefix_str, "compact",     st.compactHisto,     add_stat, c);
1017    addStat(prefix_str, "snapshot",    st.snapshotHisto,    add_stat, c);
1018    addStat(prefix_str, "delete",      st.delTimeHisto,     add_stat, c);
1019    addStat(prefix_str, "save_documents", st.saveDocsHisto, add_stat, c);
1020    addStat(prefix_str, "writeTime",   st.writeTimeHisto,   add_stat, c);
1021    addStat(prefix_str, "writeSize",   st.writeSizeHisto,   add_stat, c);
1022    addStat(prefix_str, "bulkSize",    st.batchSize,        add_stat, c);
1023
1024    // Couchstore file ops stats
1025    addStat(prefix_str, "fsReadTime",  st.fsStats.readTimeHisto,  add_stat, c);
1026    addStat(prefix_str, "fsWriteTime", st.fsStats.writeTimeHisto, add_stat, c);
1027    addStat(prefix_str, "fsSyncTime",  st.fsStats.syncTimeHisto,  add_stat, c);
1028    addStat(prefix_str, "fsReadSize",  st.fsStats.readSizeHisto,  add_stat, c);
1029    addStat(prefix_str, "fsWriteSize", st.fsStats.writeSizeHisto, add_stat, c);
1030    addStat(prefix_str, "fsReadSeek",  st.fsStats.readSeekHisto,  add_stat, c);
1031}
1032
1033bool CouchKVStore::getStat(const char* name, size_t& value)  {
1034    if (strcmp("io_total_read_bytes", name) == 0) {
1035        value = st.fsStats.totalBytesRead.load() +
1036                st.fsStatsCompaction.totalBytesRead.load();
1037        return true;
1038    } else if (strcmp("io_total_write_bytes", name) == 0) {
1039        value = st.fsStats.totalBytesWritten.load() +
1040                st.fsStatsCompaction.totalBytesWritten.load();
1041        return true;
1042    } else if (strcmp("io_compaction_read_bytes", name) == 0) {
1043        value = st.fsStatsCompaction.totalBytesRead;
1044        return true;
1045    } else if (strcmp("io_compaction_write_bytes", name) == 0) {
1046        value = st.fsStatsCompaction.totalBytesWritten;
1047        return true;
1048    }
1049
1050    return false;
1051}
1052
1053template <typename T>
1054void CouchKVStore::addStat(const std::string &prefix, const char *stat, T &val,
1055                           ADD_STAT add_stat, const void *c) {
1056    std::stringstream fullstat;
1057    fullstat << prefix << ":" << stat;
1058    add_casted_stat(fullstat.str().c_str(), val, add_stat, c);
1059}
1060
1061void CouchKVStore::pendingTasks() {
1062    if (isReadOnly()) {
1063        throw std::logic_error("CouchKVStore::pendingTasks: Not valid on a "
1064                        "read-only object.");
1065    }
1066
1067    if (!pendingFileDeletions.empty()) {
1068        std::queue<std::string> queue;
1069        pendingFileDeletions.getAll(queue);
1070
1071        while (!queue.empty()) {
1072            std::string filename_str = queue.front();
1073            if (remove(filename_str.c_str()) == -1) {
1074                LOG(EXTENSION_LOG_WARNING, "Failed to remove file '%s' "
1075                    "with error code: %d", filename_str.c_str(), errno);
1076                if (errno != ENOENT) {
1077                    pendingFileDeletions.push(filename_str);
1078                }
1079            }
1080            queue.pop();
1081        }
1082    }
1083}
1084
1085ScanContext* CouchKVStore::initScanContext(std::shared_ptr<Callback<GetValue> > cb,
1086                                           std::shared_ptr<Callback<CacheLookup> > cl,
1087                                           uint16_t vbid, uint64_t startSeqno,
1088                                           DocumentFilter options,
1089                                           ValueFilter valOptions) {
1090    Db *db = NULL;
1091    uint64_t rev = dbFileRevMap[vbid];
1092    couchstore_error_t errorCode = openDB(vbid, rev, &db,
1093                                          COUCHSTORE_OPEN_FLAG_RDONLY);
1094    if (errorCode != COUCHSTORE_SUCCESS) {
1095        LOG(EXTENSION_LOG_WARNING, "Failed to open database, "
1096            "name=%s/%" PRIu16 ".couch.%" PRIu64, dbname.c_str(), vbid, rev);
1097        remVBucketFromDbFileMap(vbid);
1098        return NULL;
1099    }
1100
1101    DbInfo info;
1102    errorCode = couchstore_db_info(db, &info);
1103    if (errorCode != COUCHSTORE_SUCCESS) {
1104        closeDatabaseHandle(db);
1105        throw std::runtime_error("Failed to read DB info for backfill. vb:" +
1106                                 std::to_string(vbid) + " rev:" +
1107                                 std::to_string(rev));
1108    }
1109
1110    uint64_t count = 0;
1111    errorCode = couchstore_changes_count(db,
1112                                         startSeqno,
1113                                         std::numeric_limits<uint64_t>::max(),
1114                                         &count);
1115    if (errorCode != COUCHSTORE_SUCCESS) {
1116        std::string err("CouchKVStore::initScanContext:Failed to obtain changes "
1117                        "count with error: " +
1118                        std::string(couchstore_strerror(errorCode)));
1119        closeDatabaseHandle(db);
1120        throw std::runtime_error(err);
1121    }
1122
1123    size_t backfillId = backfillCounter++;
1124
1125    LockHolder lh(backfillLock);
1126    backfills[backfillId] = db;
1127
1128    return new ScanContext(cb, cl, vbid, backfillId, startSeqno,
1129                           info.last_sequence, options,
1130                           valOptions, count);
1131}
1132
1133scan_error_t CouchKVStore::scan(ScanContext* ctx) {
1134    if (!ctx) {
1135        return scan_failed;
1136    }
1137
1138    if (ctx->lastReadSeqno == ctx->maxSeqno) {
1139        return scan_success;
1140    }
1141
1142    LockHolder lh(backfillLock);
1143    std::map<size_t, Db*>::iterator itr = backfills.find(ctx->scanId);
1144    if (itr == backfills.end()) {
1145        return scan_failed;
1146    }
1147
1148    Db* db = itr->second;
1149    lh.unlock();
1150
1151    couchstore_docinfos_options options;
1152    switch (ctx->docFilter) {
1153        case DocumentFilter::NO_DELETES:
1154            options = COUCHSTORE_NO_DELETES;
1155            break;
1156        case DocumentFilter::ONLY_DELETES:
1157            options = COUCHSTORE_DELETES_ONLY;
1158            break;
1159        case DocumentFilter::ALL_ITEMS:
1160            options = COUCHSTORE_NO_OPTIONS;
1161            break;
1162        default:
1163            std::string err("CouchKVStore::scan:Illegal document filter!" +
1164                            std::to_string(static_cast<int>(ctx->docFilter)));
1165            throw std::runtime_error(err);
1166    }
1167
1168    uint64_t start = ctx->startSeqno;
1169    if (ctx->lastReadSeqno != 0) {
1170        start = ctx->lastReadSeqno + 1;
1171    }
1172
1173    couchstore_error_t errorCode;
1174    errorCode = couchstore_changes_since(db, start, options, recordDbDumpC,
1175                                         static_cast<void*>(ctx));
1176    if (errorCode != COUCHSTORE_SUCCESS) {
1177        if (errorCode == COUCHSTORE_ERROR_CANCEL) {
1178            return scan_again;
1179        } else {
1180            LOG(EXTENSION_LOG_WARNING,
1181                "couchstore_changes_since failed, error=%s [%s]",
1182                couchstore_strerror(errorCode),
1183                couchkvstore_strerrno(db, errorCode).c_str());
1184            remVBucketFromDbFileMap(ctx->vbid);
1185            return scan_failed;
1186        }
1187    }
1188    return scan_success;
1189}
1190
1191void CouchKVStore::destroyScanContext(ScanContext* ctx) {
1192    if (!ctx) {
1193        return;
1194    }
1195
1196    LockHolder lh(backfillLock);
1197    std::map<size_t, Db*>::iterator itr = backfills.find(ctx->scanId);
1198    if (itr != backfills.end()) {
1199        closeDatabaseHandle(itr->second);
1200        backfills.erase(itr);
1201    }
1202    delete ctx;
1203}
1204
1205void CouchKVStore::close() {
1206    intransaction = false;
1207}
1208
1209uint64_t CouchKVStore::checkNewRevNum(std::string &dbFileName, bool newFile) {
1210    uint64_t newrev = 0;
1211    std::string nameKey;
1212
1213    if (!newFile) {
1214        // extract out the file revision number first
1215        size_t secondDot = dbFileName.rfind(".");
1216        nameKey = dbFileName.substr(0, secondDot);
1217    } else {
1218        nameKey = dbFileName;
1219    }
1220    nameKey.append(".");
1221    const std::vector<std::string> files = findFilesWithPrefix(nameKey);
1222    std::vector<std::string>::const_iterator itor;
1223    // found file(s) whoes name has the same key name pair with different
1224    // revision number
1225    for (itor = files.begin(); itor != files.end(); ++itor) {
1226        const std::string &filename = *itor;
1227        if (endWithCompact(filename)) {
1228            continue;
1229        }
1230
1231        size_t secondDot = filename.rfind(".");
1232        char *ptr = NULL;
1233        uint64_t revnum = strtoull(filename.substr(secondDot + 1).c_str(), &ptr, 10);
1234        if (newrev < revnum) {
1235            newrev = revnum;
1236            dbFileName = filename;
1237        }
1238    }
1239    return newrev;
1240}
1241
1242void CouchKVStore::updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev) {
1243    if (vbucketId >= numDbFiles) {
1244        LOG(EXTENSION_LOG_WARNING,
1245            "Cannot update db file map for an invalid vbucket, "
1246            "vbucket id = %d, rev = %" PRIu64, vbucketId, newFileRev);
1247        return;
1248    }
1249
1250    dbFileRevMap[vbucketId] = newFileRev;
1251}
1252
1253couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
1254                                        uint64_t fileRev,
1255                                        Db **db,
1256                                        uint64_t options,
1257                                        uint64_t *newFileRev,
1258                                        bool reset,
1259                                        const couch_file_ops* ops) {
1260    std::string dbFileName = getDBFileName(dbname, vbucketId, fileRev);
1261
1262    if(ops == nullptr) {
1263        ops = &statCollectingFileOps;
1264    }
1265
1266    uint64_t newRevNum = fileRev;
1267    couchstore_error_t errorCode = COUCHSTORE_SUCCESS;
1268
1269    if (reset) {
1270        errorCode = couchstore_open_db_ex(dbFileName.c_str(), options,
1271                                          ops, db);
1272        if (errorCode == COUCHSTORE_SUCCESS) {
1273            newRevNum = 1;
1274            updateDbFileMap(vbucketId, fileRev);
1275            LOG(EXTENSION_LOG_INFO,
1276                "reset: created new couchstore file, name=%s rev=%" PRIu64,
1277                dbFileName.c_str(), fileRev);
1278        } else {
1279            LOG(EXTENSION_LOG_WARNING,
1280                "reset: creating a new couchstore file,"
1281                "name=%s rev=%" PRIu64 " failed with error=%s", dbFileName.c_str(),
1282                fileRev, couchstore_strerror(errorCode));
1283        }
1284    } else {
1285        if (options == COUCHSTORE_OPEN_FLAG_CREATE) {
1286            // first try to open the requested file without the
1287            // create option in case it does already exist
1288            errorCode = couchstore_open_db_ex(dbFileName.c_str(), 0, ops, db);
1289            if (errorCode != COUCHSTORE_SUCCESS) {
1290                // open_db failed but still check if the file exists
1291                newRevNum = checkNewRevNum(dbFileName);
1292                bool fileExists = (newRevNum) ? true : false;
1293                if (fileExists) {
1294                    errorCode = openDB_retry(dbFileName, 0, ops, db,
1295                                             &newRevNum);
1296                } else {
1297                    // requested file doesn't seem to exist, just create one
1298                    errorCode = couchstore_open_db_ex(dbFileName.c_str(),
1299                                                      options, ops, db);
1300                    if (errorCode == COUCHSTORE_SUCCESS) {
1301                        newRevNum = 1;
1302                        updateDbFileMap(vbucketId, fileRev);
1303                        LOG(EXTENSION_LOG_INFO,
1304                            "INFO: created new couch db file, name=%s rev=%" PRIu64,
1305                            dbFileName.c_str(), fileRev);
1306                    }
1307                }
1308            }
1309        } else {
1310            errorCode = openDB_retry(dbFileName, options, ops, db,
1311                                     &newRevNum);
1312        }
1313    }
1314
1315    /* update command statistics */
1316    st.numOpen++;
1317    if (errorCode) {
1318        st.numOpenFailure++;
1319        LOG(EXTENSION_LOG_WARNING, "couchstore_open_db failed, name=%s"
1320            " option=%" PRIX64 " rev=%" PRIu64 " error=%s [%s]",
1321            dbFileName.c_str(), options,
1322            ((newRevNum > fileRev) ? newRevNum : fileRev),
1323            couchstore_strerror(errorCode),
1324            cb_strerror().c_str());
1325    } else {
1326        if (newRevNum > fileRev) {
1327            // new revision number found, update it
1328            updateDbFileMap(vbucketId, newRevNum);
1329        }
1330    }
1331
1332    if (newFileRev != NULL) {
1333        *newFileRev = (newRevNum > fileRev) ? newRevNum : fileRev;
1334    }
1335    return errorCode;
1336}
1337
1338couchstore_error_t CouchKVStore::openDB_retry(std::string &dbfile,
1339                                              uint64_t options,
1340                                              const couch_file_ops *ops,
1341                                              Db** db, uint64_t *newFileRev) {
1342    int retry = 0;
1343    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1344
1345    while (retry < MAX_OPEN_DB_RETRY) {
1346        errCode = couchstore_open_db_ex(dbfile.c_str(), options, ops, db);
1347        if (errCode == COUCHSTORE_SUCCESS) {
1348            return errCode;
1349        }
1350        LOG(EXTENSION_LOG_NOTICE, "INFO: couchstore_open_db failed, name=%s "
1351            "options=%" PRIX64 " error=%s [%s], try it again!",
1352            dbfile.c_str(), options, couchstore_strerror(errCode),
1353            cb_strerror().c_str());
1354        *newFileRev = checkNewRevNum(dbfile);
1355        ++retry;
1356        if (retry == MAX_OPEN_DB_RETRY - 1 && options == 0 &&
1357            errCode == COUCHSTORE_ERROR_NO_SUCH_FILE) {
1358            options = COUCHSTORE_OPEN_FLAG_CREATE;
1359        }
1360    }
1361    return errCode;
1362}
1363
1364void CouchKVStore::populateFileNameMap(std::vector<std::string> &filenames,
1365                                       std::vector<uint16_t> *vbids) {
1366    std::vector<std::string>::iterator fileItr;
1367
1368    for (fileItr = filenames.begin(); fileItr != filenames.end(); ++fileItr) {
1369        const std::string &filename = *fileItr;
1370        size_t secondDot = filename.rfind(".");
1371        std::string nameKey = filename.substr(0, secondDot);
1372        size_t firstDot = nameKey.rfind(".");
1373#ifdef _MSC_VER
1374        size_t firstSlash = nameKey.rfind("\\");
1375#else
1376        size_t firstSlash = nameKey.rfind("/");
1377#endif
1378
1379        std::string revNumStr = filename.substr(secondDot + 1);
1380        char *ptr = NULL;
1381        uint64_t revNum = strtoull(revNumStr.c_str(), &ptr, 10);
1382
1383        std::string vbIdStr = nameKey.substr(firstSlash + 1,
1384                                            (firstDot - firstSlash) - 1);
1385        if (allDigit(vbIdStr)) {
1386            int vbId = atoi(vbIdStr.c_str());
1387            if (vbids) {
1388                vbids->push_back(static_cast<uint16_t>(vbId));
1389            }
1390            uint64_t old_rev_num = dbFileRevMap[vbId];
1391            if (old_rev_num == revNum) {
1392                continue;
1393            } else if (old_rev_num < revNum) { // stale revision found
1394                dbFileRevMap[vbId] = revNum;
1395            } else { // stale file found (revision id has rolled over)
1396                old_rev_num = revNum;
1397            }
1398            std::stringstream old_file;
1399            old_file << dbname << "/" << vbId << ".couch." << old_rev_num;
1400            if (access(old_file.str().c_str(), F_OK) == 0) {
1401                if (!isReadOnly()) {
1402                    if (remove(old_file.str().c_str()) == 0) {
1403                        LOG(EXTENSION_LOG_INFO, "Removed stale file '%s'",
1404                            old_file.str().c_str());
1405                    } else {
1406                        LOG(EXTENSION_LOG_WARNING,
1407                            "Warning: Failed to remove the stale file '%s': %s",
1408                            old_file.str().c_str(), cb_strerror().c_str());
1409                    }
1410                } else {
1411                    LOG(EXTENSION_LOG_WARNING,
1412                        "A read-only instance of the underlying store was not "
1413                        "allowed to delete a stale file: %s!",
1414                        old_file.str().c_str());
1415                }
1416            }
1417        } else {
1418            // skip non-vbucket database file, master.couch etc
1419            LOG(EXTENSION_LOG_DEBUG,
1420                "Non-vbucket database file, %s, skip adding "
1421                "to CouchKVStore dbFileMap\n", filename.c_str());
1422        }
1423    }
1424}
1425
1426couchstore_error_t CouchKVStore::fetchDoc(Db *db, DocInfo *docinfo,
1427                                          GetValue &docValue, uint16_t vbId,
1428                                          bool metaOnly, bool fetchDelete) {
1429    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1430    sized_buf metadata = docinfo->rev_meta;
1431    uint32_t itemFlags = 0;
1432    uint64_t cas = 0;
1433    time_t exptime = 0;
1434    uint8_t ext_meta[EXT_META_LEN];
1435    uint8_t ext_len = 0;
1436
1437    if (metadata.size < DEFAULT_META_LEN) {
1438        throw std::invalid_argument("CouchKVStore::fetchDoc: "
1439                        "docValue->rev_meta.size (which is " +
1440                        std::to_string(metadata.size) +
1441                        ") is less than DEFAULT_META_LEN (which is " +
1442                        std::to_string(DEFAULT_META_LEN) + ")");
1443    }
1444
1445    if (metadata.size >= DEFAULT_META_LEN) {
1446        memcpy(&cas, (metadata.buf), 8);
1447        memcpy(&exptime, (metadata.buf) + 8, 4);
1448        memcpy(&itemFlags, (metadata.buf) + 12, 4);
1449        ext_len = 0;
1450    }
1451
1452    if (metadata.size >= V1_META_LEN) {
1453        memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1454               EXT_META_LEN);
1455        ext_len = EXT_META_LEN;
1456    }
1457
1458    cas = ntohll(cas);
1459    exptime = ntohl(exptime);
1460
1461    if (metaOnly || (fetchDelete && docinfo->deleted)) {
1462        Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1463                            itemFlags, (time_t)exptime, NULL, docinfo->size,
1464                            ext_meta, ext_len, cas, docinfo->db_seq, vbId);
1465        if (docinfo->deleted) {
1466            it->setDeleted();
1467        }
1468
1469        it->setRevSeqno(docinfo->rev_seq);
1470        docValue = GetValue(it);
1471        // update ep-engine IO stats
1472        ++st.io_num_read;
1473        st.io_read_bytes.fetch_add(docinfo->id.size);
1474    } else {
1475        Doc *doc = NULL;
1476        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1477                                                   DECOMPRESS_DOC_BODIES);
1478        if (errCode == COUCHSTORE_SUCCESS) {
1479            if (docinfo->deleted) {
1480                // do not read a doc that is marked deleted, just return the
1481                // error code as not found but still release the document body.
1482                errCode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1483            } else {
1484                if (doc == nullptr) {
1485                    throw std::logic_error("CouchKVStore::fetchDoc: doc is NULL");
1486                }
1487                if (doc->id.size > UINT16_MAX) {
1488                    throw std::logic_error("CouchKVStore::fetchDoc: "
1489                            "doc->id.size (which is" +
1490                            std::to_string(doc->id.size) + ") is greater than "
1491                            + std::to_string(UINT16_MAX));
1492                }
1493
1494                size_t valuelen = doc->data.size;
1495                void *valuePtr = doc->data.buf;
1496
1497                /**
1498                 * Set Datatype correctly if data is being
1499                 * read from couch files where datatype is
1500                 * not supported.
1501                 */
1502                if (metadata.size == DEFAULT_META_LEN) {
1503                    ext_len = EXT_META_LEN;
1504                    ext_meta[0] = determine_datatype((const unsigned char*)valuePtr,
1505                                                     valuelen);
1506                }
1507
1508                Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1509                                    itemFlags, (time_t)exptime, valuePtr, valuelen,
1510                                    ext_meta, ext_len, cas, docinfo->db_seq, vbId,
1511                                    docinfo->rev_seq);
1512
1513                docValue = GetValue(it);
1514
1515                // update ep-engine IO stats
1516                ++st.io_num_read;
1517                st.io_read_bytes.fetch_add(docinfo->id.size + valuelen);
1518            }
1519            couchstore_free_document(doc);
1520        }
1521    }
1522    return errCode;
1523}
1524
1525int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
1526
1527    ScanContext* sctx = static_cast<ScanContext*>(ctx);
1528    std::shared_ptr<Callback<GetValue> > cb = sctx->callback;
1529    std::shared_ptr<Callback<CacheLookup> > cl = sctx->lookup;
1530
1531    Doc *doc = NULL;
1532    void *valuePtr = NULL;
1533    size_t valuelen = 0;
1534    uint64_t byseqno = docinfo->db_seq;
1535    sized_buf  metadata = docinfo->rev_meta;
1536    uint16_t vbucketId = sctx->vbid;
1537    sized_buf key = docinfo->id;
1538    uint32_t itemflags = 0;
1539    uint64_t cas = 0;
1540    uint32_t exptime = 0;
1541    uint8_t ext_meta[EXT_META_LEN] = {0};
1542    uint8_t ext_len = 0;
1543
1544    if (key.size > UINT16_MAX) {
1545        throw std::invalid_argument("CouchKVStore::recordDbDump: "
1546                        "docinfo->id.size (which is " + std::to_string(key.size) +
1547                        ") is greater than " + std::to_string(UINT16_MAX));
1548    }
1549    if (metadata.size < DEFAULT_META_LEN) {
1550        throw std::invalid_argument("CouchKVStore::recordDbDump: "
1551                        "docinfo->rev_meta.size (which is " + std::to_string(key.size) +
1552                        ") is less than " + std::to_string(DEFAULT_META_LEN));
1553    }
1554
1555    std::string docKey(docinfo->id.buf, docinfo->id.size);
1556    CacheLookup lookup(docKey, byseqno, vbucketId);
1557    cl->callback(lookup);
1558    if (cl->getStatus() == ENGINE_KEY_EEXISTS) {
1559        sctx->lastReadSeqno = byseqno;
1560        return COUCHSTORE_SUCCESS;
1561    } else if (cl->getStatus() == ENGINE_ENOMEM) {
1562        return COUCHSTORE_ERROR_CANCEL;
1563    }
1564
1565    if (metadata.size >= DEFAULT_META_LEN) {
1566        memcpy(&cas, (metadata.buf), 8);
1567        memcpy(&exptime, (metadata.buf) + 8, 4);
1568        memcpy(&itemflags, (metadata.buf) + 12, 4);
1569        ext_len = 0;
1570    }
1571
1572    if (metadata.size >= V1_META_LEN) {
1573        memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1574               EXT_META_LEN);
1575        ext_len = EXT_META_LEN;
1576    }
1577
1578    exptime = ntohl(exptime);
1579    cas = ntohll(cas);
1580
1581    if (sctx->valFilter != ValueFilter::KEYS_ONLY && !docinfo->deleted) {
1582        couchstore_error_t errCode;
1583        bool expectCompressed = false;
1584        /**
1585         * If couch files do not support datatype or no special
1586         * request is made to retrieve compressed documents as is,
1587         * then DECOMPRESS the document.
1588         */
1589        couchstore_open_options openOptions = 0;
1590        if (metadata.size == DEFAULT_META_LEN ||
1591            sctx->valFilter == ValueFilter::VALUES_DECOMPRESSED) {
1592            openOptions = DECOMPRESS_DOC_BODIES;
1593        } else {
1594            // => sctx->valFilter == ValueFilter::VALUES_COMPRESSED
1595            expectCompressed = true;
1596        }
1597        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc, openOptions);
1598
1599        if (errCode == COUCHSTORE_SUCCESS) {
1600            if (doc->data.size) {
1601                valuelen = doc->data.size;
1602                valuePtr = doc->data.buf;
1603
1604                /**
1605                 * Set Datatype correctly if data is being
1606                 * read from couch files where datatype is
1607                 * not supported.
1608                 */
1609                if (metadata.size == DEFAULT_META_LEN) {
1610                    ext_len = EXT_META_LEN;
1611                    ext_meta[0] = determine_datatype((const unsigned char*)valuePtr,
1612                                                     valuelen);
1613                }
1614
1615                if (expectCompressed) {
1616                    /**
1617                     * If a compressed document was retrieved as is,
1618                     * update the datatype of the document.
1619                     */
1620                    uint8_t datatype = ext_meta[0];
1621                    if (datatype == PROTOCOL_BINARY_DATATYPE_JSON) {
1622                        ext_meta[0] = PROTOCOL_BINARY_DATATYPE_COMPRESSED_JSON;
1623                    } else if (datatype == PROTOCOL_BINARY_RAW_BYTES) {
1624                        ext_meta[0] = PROTOCOL_BINARY_DATATYPE_COMPRESSED;
1625                    }
1626                }
1627            }
1628        } else {
1629            LOG(EXTENSION_LOG_WARNING,
1630                "Failed to retrieve key value from database "
1631                "database, vBucket=%d key=%s error=%s [%s]\n",
1632                vbucketId, key.buf, couchstore_strerror(errCode),
1633                couchkvstore_strerrno(db, errCode).c_str());
1634            return COUCHSTORE_SUCCESS;
1635        }
1636    }
1637
1638    Item *it = new Item((void *)key.buf,
1639                        key.size,
1640                        itemflags,
1641                        (time_t)exptime,
1642                        valuePtr, valuelen,
1643                        ext_meta, ext_len,
1644                        cas,
1645                        docinfo->db_seq, // return seq number being persisted on disk
1646                        vbucketId,
1647                        docinfo->rev_seq);
1648    if (docinfo->deleted) {
1649        it->setDeleted();
1650    }
1651
1652    bool onlyKeys = (sctx->valFilter == ValueFilter::KEYS_ONLY) ? true : false;
1653    GetValue rv(it, ENGINE_SUCCESS, -1, onlyKeys);
1654    cb->callback(rv);
1655
1656    couchstore_free_document(doc);
1657
1658    if (cb->getStatus() == ENGINE_ENOMEM) {
1659        return COUCHSTORE_ERROR_CANCEL;
1660    }
1661
1662    sctx->lastReadSeqno = byseqno;
1663    return COUCHSTORE_SUCCESS;
1664}
1665
1666bool CouchKVStore::commit2couchstore(Callback<kvstats_ctx> *cb) {
1667    bool success = true;
1668
1669    size_t pendingCommitCnt = pendingReqsQ.size();
1670    if (pendingCommitCnt == 0) {
1671        return success;
1672    }
1673
1674    Doc **docs = new Doc *[pendingCommitCnt];
1675    DocInfo **docinfos = new DocInfo *[pendingCommitCnt];
1676
1677    if (pendingReqsQ[0] == nullptr) {
1678        throw std::logic_error("CouchKVStore::commit2couchstore: "
1679                        "pendingReqsQ[0] is NULL");
1680    }
1681    uint16_t vbucket2flush = pendingReqsQ[0]->getVBucketId();
1682    uint64_t fileRev = pendingReqsQ[0]->getRevNum();
1683    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1684        CouchRequest *req = pendingReqsQ[i];
1685        if (req == nullptr) {
1686            throw std::logic_error("CouchKVStore::commit2couchstore: "
1687                                       "pendingReqsQ["
1688                                       + std::to_string(i) + "] is NULL");
1689        }
1690        docs[i] = (Doc *)req->getDbDoc();
1691        docinfos[i] = req->getDbDocInfo();
1692        if (vbucket2flush != req->getVBucketId()) {
1693            throw std::logic_error(
1694                    "CouchKVStore::commit2couchstore: "
1695                    "mismatch between vbucket2flush (which is "
1696                    + std::to_string(vbucket2flush) + ") and pendingReqsQ["
1697                    + std::to_string(i) + "] (which is "
1698                    + std::to_string(req->getVBucketId()) + ")");
1699        }
1700    }
1701
1702    kvstats_ctx kvctx;
1703    kvctx.vbucket = vbucket2flush;
1704    // flush all
1705    couchstore_error_t errCode = saveDocs(vbucket2flush, fileRev, docs,
1706                                          docinfos, pendingCommitCnt,
1707                                          kvctx);
1708    if (errCode) {
1709        success = false;
1710        LOG(EXTENSION_LOG_WARNING,
1711            "Commit failed, cannot save CouchDB docs "
1712            "for vbucket = %d rev = %" PRIu64, vbucket2flush, fileRev);
1713    }
1714    if (cb) {
1715        cb->callback(kvctx);
1716    }
1717    commitCallback(pendingReqsQ, kvctx, errCode);
1718
1719    // clean up
1720    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1721        delete pendingReqsQ[i];
1722    }
1723    pendingReqsQ.clear();
1724    delete [] docs;
1725    delete [] docinfos;
1726    return success;
1727}
1728
1729static int readDocInfos(Db *db, DocInfo *docinfo, void *ctx) {
1730    if (ctx == nullptr) {
1731        throw std::invalid_argument("readDocInfos: ctx must be non-NULL");
1732    }
1733    kvstats_ctx *cbCtx = static_cast<kvstats_ctx *>(ctx);
1734    if(docinfo) {
1735        // An item exists in the VB DB file.
1736        if (!docinfo->deleted) {
1737            std::string key(docinfo->id.buf, docinfo->id.size);
1738            std::unordered_map<std::string, kstat_entry_t>::iterator itr =
1739                cbCtx->keyStats.find(key);
1740            if (itr != cbCtx->keyStats.end()) {
1741                itr->second.first = true;
1742            }
1743        }
1744    }
1745    return 0;
1746}
1747
1748couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev,
1749                                          Doc **docs, DocInfo **docinfos,
1750                                          size_t docCount, kvstats_ctx &kvctx) {
1751    couchstore_error_t errCode;
1752    uint64_t fileRev = rev;
1753    DbInfo info;
1754    if (rev == 0) {
1755        throw std::invalid_argument("CouchKVStore::saveDocs: rev must be non-zero");
1756    }
1757
1758    Db *db = NULL;
1759    uint64_t newFileRev;
1760    errCode = openDB(vbid, fileRev, &db, 0, &newFileRev);
1761    if (errCode != COUCHSTORE_SUCCESS) {
1762        LOG(EXTENSION_LOG_WARNING,
1763                "Failed to open database, vbucketId = %d "
1764                "fileRev = %" PRIu64 " numDocs = %" PRIu64, vbid, fileRev,
1765                uint64_t(docCount));
1766        return errCode;
1767    } else {
1768        vbucket_state *state = cachedVBStates[vbid];
1769        if (state == nullptr) {
1770            throw std::logic_error(
1771                    "CouchKVStore::saveDocs: cachedVBStates[" +
1772                    std::to_string(vbid) + "] is NULL");
1773        }
1774
1775        uint64_t maxDBSeqno = 0;
1776        sized_buf *ids = new sized_buf[docCount];
1777        for (size_t idx = 0; idx < docCount; idx++) {
1778            ids[idx] = docinfos[idx]->id;
1779            maxDBSeqno = std::max(maxDBSeqno, docinfos[idx]->db_seq);
1780            std::string key(ids[idx].buf, ids[idx].size);
1781            kvctx.keyStats[key] = std::make_pair(false,
1782                    !docinfos[idx]->deleted);
1783        }
1784        couchstore_docinfos_by_id(db, ids, (unsigned) docCount, 0,
1785                readDocInfos, &kvctx);
1786        delete[] ids;
1787
1788        hrtime_t cs_begin = gethrtime();
1789        uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
1790        errCode = couchstore_save_documents(db, docs, docinfos,
1791                (unsigned) docCount, flags);
1792        st.saveDocsHisto.add((gethrtime() - cs_begin) / 1000);
1793        if (errCode != COUCHSTORE_SUCCESS) {
1794            LOG(EXTENSION_LOG_WARNING,
1795                    "Failed to save docs to database, "
1796                    "numDocs = %" PRIu64 " error=%s [%s]\n",
1797                    uint64_t(docCount), couchstore_strerror(errCode),
1798                    couchkvstore_strerrno(db, errCode).c_str());
1799            closeDatabaseHandle(db);
1800            return errCode;
1801        }
1802
1803        errCode = saveVBState(db, *state);
1804        if (errCode != COUCHSTORE_SUCCESS) {
1805            LOG(EXTENSION_LOG_WARNING, "Failed to save local docs to "
1806                "database, error=%s [%s]", couchstore_strerror(errCode),
1807                couchkvstore_strerrno(db, errCode).c_str());
1808                closeDatabaseHandle(db);
1809                return errCode;
1810        }
1811
1812        cs_begin = gethrtime();
1813        errCode = couchstore_commit(db);
1814        st.commitHisto.add((gethrtime() - cs_begin) / 1000);
1815        if (errCode) {
1816            LOG(EXTENSION_LOG_WARNING,
1817                    "couchstore_commit failed, error=%s [%s]",
1818                    couchstore_strerror(errCode),
1819                    couchkvstore_strerrno(db, errCode).c_str());
1820            closeDatabaseHandle(db);
1821            return errCode;
1822        }
1823
1824        st.batchSize.add(docCount);
1825
1826        // retrieve storage system stats for file fragmentation computation
1827        couchstore_db_info(db, &info);
1828        kvctx.fileSpaceUsed = info.space_used;
1829        kvctx.fileSize = info.file_size;
1830        cachedDeleteCount[vbid] = info.deleted_count;
1831        cachedDocCount[vbid] = info.doc_count;
1832
1833        if (maxDBSeqno != info.last_sequence) {
1834            LOG(EXTENSION_LOG_WARNING, "Seqno in db header (%" PRIu64 ")"
1835                " is not matched with what was persisted (%" PRIu64 ")"
1836                " for vbucket %d",
1837                info.last_sequence, maxDBSeqno, vbid);
1838        }
1839        state->highSeqno = info.last_sequence;
1840
1841        closeDatabaseHandle(db);
1842    }
1843
1844    /* update stat */
1845    if(errCode == COUCHSTORE_SUCCESS) {
1846        st.docsCommitted = docCount;
1847    }
1848
1849    return errCode;
1850}
1851
1852void CouchKVStore::remVBucketFromDbFileMap(uint16_t vbucketId) {
1853    if (vbucketId >= numDbFiles) {
1854        LOG(EXTENSION_LOG_WARNING,
1855            "Cannot remove db file map entry for an invalid vbucket, "
1856            "vbucket id = %d\n", vbucketId);
1857        return;
1858    }
1859
1860    // just reset revision number of the requested vbucket
1861    dbFileRevMap[vbucketId] = 1;
1862}
1863
1864void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
1865                                  kvstats_ctx &kvctx,
1866                                  couchstore_error_t errCode) {
1867    size_t commitSize = committedReqs.size();
1868
1869    for (size_t index = 0; index < commitSize; index++) {
1870        size_t dataSize = committedReqs[index]->getNBytes();
1871        size_t keySize = committedReqs[index]->getKey().length();
1872        /* update ep stats */
1873        ++st.io_num_write;
1874        st.io_write_bytes.fetch_add(keySize + dataSize);
1875
1876        if (committedReqs[index]->isDelete()) {
1877            int rv = getMutationStatus(errCode);
1878            if (rv != -1) {
1879                const std::string &key = committedReqs[index]->getKey();
1880                if (kvctx.keyStats[key].first) {
1881                    rv = 1; // Deletion is for an existing item on DB file.
1882                } else {
1883                    rv = 0; // Deletion is for a non-existing item on DB file.
1884                }
1885            }
1886            if (errCode) {
1887                ++st.numDelFailure;
1888            } else {
1889                st.delTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1890            }
1891            committedReqs[index]->getDelCallback()->callback(rv);
1892        } else {
1893            int rv = getMutationStatus(errCode);
1894            const std::string &key = committedReqs[index]->getKey();
1895            bool insertion = !kvctx.keyStats[key].first;
1896            if (errCode) {
1897                ++st.numSetFailure;
1898            } else {
1899                st.writeTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1900                st.writeSizeHisto.add(dataSize + keySize);
1901            }
1902            mutation_result p(rv, insertion);
1903            committedReqs[index]->getSetCallback()->callback(p);
1904        }
1905    }
1906}
1907
1908ENGINE_ERROR_CODE CouchKVStore::readVBState(Db *db, uint16_t vbId) {
1909    sized_buf id;
1910    LocalDoc *ldoc = NULL;
1911    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1912    vbucket_state_t state = vbucket_state_dead;
1913    uint64_t checkpointId = 0;
1914    uint64_t maxDeletedSeqno = 0;
1915    int64_t highSeqno = 0;
1916    std::string failovers;
1917    uint64_t purgeSeqno = 0;
1918    uint64_t lastSnapStart = 0;
1919    uint64_t lastSnapEnd = 0;
1920    uint64_t maxCas = 0;
1921
1922    DbInfo info;
1923    errCode = couchstore_db_info(db, &info);
1924    if (errCode == COUCHSTORE_SUCCESS) {
1925        highSeqno = info.last_sequence;
1926        purgeSeqno = info.purge_seq;
1927    } else {
1928        LOG(EXTENSION_LOG_WARNING,
1929            "CouchKVStore::readVBState:Failed to read database info "
1930            "for vbucket: %d with error: %s", vbId,
1931            couchstore_strerror(errCode));
1932        return couchErr2EngineErr(errCode);
1933    }
1934
1935    id.buf = (char *)"_local/vbstate";
1936    id.size = sizeof("_local/vbstate") - 1;
1937    errCode = couchstore_open_local_document(db, (void *)id.buf,
1938                                             id.size, &ldoc);
1939    if (errCode != COUCHSTORE_SUCCESS) {
1940        if (errCode == COUCHSTORE_ERROR_DOC_NOT_FOUND) {
1941            LOG(EXTENSION_LOG_NOTICE,
1942                "CouchKVStore::readVBState: '_local/vbstate' not found "
1943                "for vBucket: %d", vbId);
1944        } else {
1945            LOG(EXTENSION_LOG_WARNING,
1946                "CouchKVStore::readVBState: Failed to "
1947                "retrieve stat info for vBucket: %d with error: %s",
1948                vbId, couchstore_strerror(errCode));
1949        }
1950    } else {
1951        const std::string statjson(ldoc->json.buf, ldoc->json.size);
1952        cJSON *jsonObj = cJSON_Parse(statjson.c_str());
1953        if (!jsonObj) {
1954            couchstore_free_local_document(ldoc);
1955            LOG(EXTENSION_LOG_WARNING, "CouchKVStore::readVBState: Failed to "
1956                "parse the vbstat json doc for vbucket %d: %s",
1957                vbId , statjson.c_str());
1958            return couchErr2EngineErr(errCode);
1959        }
1960
1961        const std::string vb_state = getJSONObjString(
1962                                cJSON_GetObjectItem(jsonObj, "state"));
1963        const std::string checkpoint_id = getJSONObjString(
1964                                cJSON_GetObjectItem(jsonObj,"checkpoint_id"));
1965        const std::string max_deleted_seqno = getJSONObjString(
1966                                cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
1967        const std::string snapStart = getJSONObjString(
1968                                cJSON_GetObjectItem(jsonObj, "snap_start"));
1969        const std::string snapEnd = getJSONObjString(
1970                                cJSON_GetObjectItem(jsonObj, "snap_end"));
1971        const std::string maxCasValue = getJSONObjString(
1972                                cJSON_GetObjectItem(jsonObj, "max_cas"));
1973        cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
1974        if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
1975                || max_deleted_seqno.compare("") == 0) {
1976            LOG(EXTENSION_LOG_WARNING, "CouchKVStore::readVBState: State JSON doc "
1977                "for vbucket: %d is in the wrong format: %s, vb state: %s,"
1978                "checkpoint id: %s and max deleted seqno: %s",
1979                vbId, statjson.c_str(), vb_state.c_str(),
1980                checkpoint_id.c_str(), max_deleted_seqno.c_str());
1981        } else {
1982            state = VBucket::fromString(vb_state.c_str());
1983            parseUint64(max_deleted_seqno.c_str(), &maxDeletedSeqno);
1984            parseUint64(checkpoint_id.c_str(), &checkpointId);
1985
1986            if (snapStart.compare("") == 0) {
1987                lastSnapStart = highSeqno;
1988            } else {
1989                parseUint64(snapStart.c_str(), &lastSnapStart);
1990            }
1991
1992            if (snapEnd.compare("") == 0) {
1993                lastSnapEnd = highSeqno;
1994            } else {
1995                parseUint64(snapEnd.c_str(), &lastSnapEnd);
1996            }
1997
1998            if (maxCasValue.compare("") != 0) {
1999                parseUint64(maxCasValue.c_str(), &maxCas);
2000
2001                // MB-17517: If the maxCas on disk was invalid then don't use it -
2002                // instead rebuild from the items we load from disk (i.e. as per
2003                // an upgrade from an earlier version).
2004                if (maxCas == static_cast<uint64_t>(-1)) {
2005                    LOG(EXTENSION_LOG_WARNING,
2006                        "Invalid max_cas (0x%" PRIx64 ") read from '%s' for "
2007                        "vbucket %" PRIu16 ". Resetting max_cas to zero.",
2008                        maxCas, id.buf, vbId);
2009                    maxCas = 0;
2010                }
2011            }
2012
2013            if (failover_json) {
2014                char* json = cJSON_PrintUnformatted(failover_json);
2015                failovers.assign(json);
2016                cJSON_Free(json);
2017            }
2018        }
2019        cJSON_Delete(jsonObj);
2020        couchstore_free_local_document(ldoc);
2021    }
2022
2023    delete cachedVBStates[vbId];
2024    cachedVBStates[vbId] = new vbucket_state(state, checkpointId,
2025                                             maxDeletedSeqno, highSeqno,
2026                                             purgeSeqno, lastSnapStart,
2027                                             lastSnapEnd, maxCas, failovers);
2028
2029    return couchErr2EngineErr(errCode);
2030}
2031
2032couchstore_error_t CouchKVStore::saveVBState(Db *db,
2033                                             const vbucket_state &vbState) {
2034    std::stringstream jsonState;
2035
2036    jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
2037              << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
2038              << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno << "\""
2039              << ",\"failover_table\": " << vbState.failovers
2040              << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
2041              << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
2042              << ",\"max_cas\": \"" << vbState.maxCas << "\""
2043              << "}";
2044
2045    LocalDoc lDoc;
2046    lDoc.id.buf = (char *)"_local/vbstate";
2047    lDoc.id.size = sizeof("_local/vbstate") - 1;
2048    std::string state = jsonState.str();
2049    lDoc.json.buf = (char *)state.c_str();
2050    lDoc.json.size = state.size();
2051    lDoc.deleted = 0;
2052
2053    couchstore_error_t errCode = couchstore_save_local_document(db, &lDoc);
2054    if (errCode != COUCHSTORE_SUCCESS) {
2055        LOG(EXTENSION_LOG_WARNING,
2056            "couchstore_save_local_document failed "
2057            "error=%s [%s]\n", couchstore_strerror(errCode),
2058            couchkvstore_strerrno(db, errCode).c_str());
2059    }
2060    return errCode;
2061}
2062
2063int CouchKVStore::getMultiCb(Db *db, DocInfo *docinfo, void *ctx) {
2064    if (docinfo == nullptr) {
2065        throw std::invalid_argument("CouchKVStore::getMultiCb: docinfo "
2066                "must be non-NULL");
2067    }
2068    if (ctx == nullptr) {
2069        throw std::invalid_argument("CouchKVStore::getMultiCb: ctx must "
2070                "be non-NULL");
2071    }
2072
2073    std::string keyStr(docinfo->id.buf, docinfo->id.size);
2074    GetMultiCbCtx *cbCtx = static_cast<GetMultiCbCtx *>(ctx);
2075    CouchKVStoreStats &st = cbCtx->cks.getCKVStoreStat();
2076
2077    vb_bgfetch_queue_t::iterator qitr = cbCtx->fetches.find(keyStr);
2078    if (qitr == cbCtx->fetches.end()) {
2079        // this could be a serious race condition in couchstore,
2080        // log a warning message and continue
2081        LOG(EXTENSION_LOG_WARNING,
2082            "Couchstore returned invalid docinfo, "
2083            "no pending bgfetch has been issued for key = %s\n",
2084            keyStr.c_str());
2085        return 0;
2086    }
2087
2088    vb_bgfetch_item_ctx_t& bg_itm_ctx = (*qitr).second;
2089    bool meta_only = bg_itm_ctx.isMetaOnly;
2090
2091    GetValue returnVal;
2092
2093    couchstore_error_t errCode = cbCtx->cks.fetchDoc(db, docinfo, returnVal,
2094                                                     cbCtx->vbId, meta_only);
2095    if (errCode != COUCHSTORE_SUCCESS && !meta_only) {
2096        st.numGetFailure++;
2097    }
2098
2099    returnVal.setStatus(cbCtx->cks.couchErr2EngineErr(errCode));
2100
2101    std::list<VBucketBGFetchItem *> &fetches = bg_itm_ctx.bgfetched_list;
2102    std::list<VBucketBGFetchItem *>::iterator itr = fetches.begin();
2103
2104    bool return_val_ownership_transferred = false;
2105    for (itr = fetches.begin(); itr != fetches.end(); ++itr) {
2106        return_val_ownership_transferred = true;
2107        // populate return value for remaining fetch items with the
2108        // same seqid
2109        (*itr)->value = returnVal;
2110        st.readTimeHisto.add((gethrtime() - (*itr)->initTime) / 1000);
2111        if (errCode == COUCHSTORE_SUCCESS) {
2112            st.readSizeHisto.add(returnVal.getValue()->getNKey() +
2113                                 returnVal.getValue()->getNBytes());
2114        }
2115    }
2116    if (!return_val_ownership_transferred) {
2117        LOG(EXTENSION_LOG_WARNING, "CouchKVStore::getMultiCb called with zero"
2118            "items in bgfetched_list, vBucket=%d key=%s",
2119            cbCtx->vbId, keyStr.c_str());
2120        delete returnVal.getValue();
2121    }
2122
2123    return 0;
2124}
2125
2126
2127void CouchKVStore::closeDatabaseHandle(Db *db) {
2128    couchstore_error_t ret = couchstore_close_db(db);
2129    if (ret != COUCHSTORE_SUCCESS) {
2130        LOG(EXTENSION_LOG_WARNING,
2131            "couchstore_close_db failed, error=%s [%s]",
2132            couchstore_strerror(ret), couchkvstore_strerrno(NULL, ret).c_str());
2133    }
2134    st.numClose++;
2135}
2136
2137ENGINE_ERROR_CODE CouchKVStore::couchErr2EngineErr(couchstore_error_t errCode) {
2138    switch (errCode) {
2139    case COUCHSTORE_SUCCESS:
2140        return ENGINE_SUCCESS;
2141    case COUCHSTORE_ERROR_ALLOC_FAIL:
2142        return ENGINE_ENOMEM;
2143    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
2144        return ENGINE_KEY_ENOENT;
2145    case COUCHSTORE_ERROR_NO_SUCH_FILE:
2146    case COUCHSTORE_ERROR_NO_HEADER:
2147    default:
2148        // same as the general error return code of
2149        // EventuallyPersistentStore::getInternal
2150        return ENGINE_TMPFAIL;
2151    }
2152}
2153
2154size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
2155    size_t delCount = cachedDeleteCount[vbid];
2156    if (delCount != (size_t) -1) {
2157        return delCount;
2158    }
2159
2160    Db *db = NULL;
2161    uint64_t rev = dbFileRevMap[vbid];
2162    couchstore_error_t errCode = openDB(vbid, rev, &db,
2163                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2164    if (errCode == COUCHSTORE_SUCCESS) {
2165        DbInfo info;
2166        errCode = couchstore_db_info(db, &info);
2167        if (errCode == COUCHSTORE_SUCCESS) {
2168            cachedDeleteCount[vbid] = info.deleted_count;
2169            closeDatabaseHandle(db);
2170            return info.deleted_count;
2171        } else {
2172            throw std::runtime_error("CouchKVStore::getNumPersistedDeletes:"
2173                "Failed to read database info for vBucket = " +
2174                std::to_string(vbid) + " rev = " + std::to_string(rev) +
2175                " with error:" + couchstore_strerror(errCode));
2176        }
2177        closeDatabaseHandle(db);
2178    } else {
2179        // open failed - map couchstore error code to exception.
2180        std::errc ec;
2181        switch (errCode) {
2182            case COUCHSTORE_ERROR_OPEN_FILE:
2183                ec = std::errc::no_such_file_or_directory; break;
2184            default:
2185                ec = std::errc::io_error; break;
2186        }
2187        throw std::system_error(std::make_error_code(ec),
2188                                "CouchKVStore::getNumPersistedDeletes:"
2189            "Failed to open database file for vBucket = " +
2190            std::to_string(vbid) + " rev = " + std::to_string(rev) +
2191            " with error:" + couchstore_strerror(errCode));
2192    }
2193    return 0;
2194}
2195
2196DBFileInfo CouchKVStore::getDbFileInfo(uint16_t vbid) {
2197    Db *db = nullptr;
2198    uint64_t rev = dbFileRevMap[vbid];
2199
2200    DBFileInfo vbinfo;
2201
2202    couchstore_error_t errCode = openDB(vbid, rev, &db,
2203                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2204    if (errCode == COUCHSTORE_SUCCESS) {
2205        DbInfo info;
2206        errCode = couchstore_db_info(db, &info);
2207        closeDatabaseHandle(db);
2208        if (errCode == COUCHSTORE_SUCCESS) {
2209            cachedDocCount[vbid] = info.doc_count;
2210            vbinfo.itemCount = info.doc_count;
2211            vbinfo.fileSize = info.file_size;
2212            vbinfo.spaceUsed = info.space_used;
2213        } else {
2214            throw std::runtime_error("CouchKVStore::getDbFileInfo: Failed "
2215                "to read database info for vBucket = " + std::to_string(vbid) +
2216                " rev = " + std::to_string(rev) +
2217                " with error:" + couchstore_strerror(errCode));
2218        }
2219    } else {
2220        // open failed - map couchstore error code to exception.
2221        std::errc ec;
2222        switch (errCode) {
2223            case COUCHSTORE_ERROR_OPEN_FILE:
2224                ec = std::errc::no_such_file_or_directory; break;
2225            default:
2226                ec = std::errc::io_error; break;
2227        }
2228        throw std::system_error(std::make_error_code(ec),
2229                                "CouchKVStore::getDbInfo: failed to open database file for "
2230                                "vBucket = " + std::to_string(vbid) +
2231                                " rev = " + std::to_string(rev) +
2232                                " with error:" + couchstore_strerror(errCode));
2233    }
2234    return vbinfo;
2235}
2236
2237size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
2238                                 uint64_t max_seq) {
2239    Db *db = NULL;
2240    uint64_t count = 0;
2241    uint64_t rev = dbFileRevMap[vbid];
2242    couchstore_error_t errCode = openDB(vbid, rev, &db,
2243                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2244    if (errCode == COUCHSTORE_SUCCESS) {
2245        errCode = couchstore_changes_count(db, min_seq, max_seq, &count);
2246        if (errCode != COUCHSTORE_SUCCESS) {
2247            throw std::runtime_error("CouchKVStore::getNumItems: Failed to "
2248                "get changes count for vBucket = " + std::to_string(vbid) +
2249                " rev = " + std::to_string(rev) +
2250                " with error:" + couchstore_strerror(errCode));
2251        }
2252        closeDatabaseHandle(db);
2253    } else {
2254        throw std::invalid_argument("CouchKVStore::getNumItems: Failed to "
2255            "open database file for vBucket = " + std::to_string(vbid) +
2256            " rev = " + std::to_string(rev) +
2257            " with error:" + couchstore_strerror(errCode));
2258    }
2259    return count;
2260}
2261
2262RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
2263                                      std::shared_ptr<RollbackCB> cb) {
2264    DbHolder db(this);
2265    DbInfo info;
2266    uint64_t fileRev = dbFileRevMap[vbid];
2267    std::stringstream dbFileName;
2268    dbFileName << dbname << "/" << vbid << ".couch." << fileRev;
2269    couchstore_error_t errCode;
2270
2271    errCode = openDB(vbid, fileRev, db.getDbAddress(),
2272                     (uint64_t) COUCHSTORE_OPEN_FLAG_RDONLY);
2273
2274    if (errCode == COUCHSTORE_SUCCESS) {
2275        errCode = couchstore_db_info(db.getDb(), &info);
2276        if (errCode != COUCHSTORE_SUCCESS) {
2277            LOG(EXTENSION_LOG_WARNING,
2278                "Failed to read DB info, name=%s",
2279                dbFileName.str().c_str());
2280            return RollbackResult(false, 0, 0, 0);
2281        }
2282    } else {
2283        LOG(EXTENSION_LOG_WARNING,
2284                "Failed to open database, name=%s",
2285                dbFileName.str().c_str());
2286        return RollbackResult(false, 0, 0, 0);
2287    }
2288
2289    uint64_t latestSeqno = info.last_sequence;
2290
2291    //Count from latest seq no to 0
2292    uint64_t totSeqCount = 0;
2293    errCode = couchstore_changes_count(db.getDb(), 0, latestSeqno, &totSeqCount);
2294    if (errCode != COUCHSTORE_SUCCESS) {
2295        LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2296            "rollback vBucket = %d, rev = %" PRIu64 ", error=%s [%s]",
2297            vbid, fileRev,  couchstore_strerror(errCode),
2298            cb_strerror().c_str());
2299        return RollbackResult(false, 0, 0, 0);
2300    }
2301
2302    DbHolder newdb(this);
2303    errCode = openDB(vbid, fileRev, newdb.getDbAddress(), 0);
2304    if (errCode != COUCHSTORE_SUCCESS) {
2305        LOG(EXTENSION_LOG_WARNING,
2306                "Failed to open database, name=%s",
2307                dbFileName.str().c_str());
2308        return RollbackResult(false, 0, 0, 0);
2309    }
2310
2311    while (info.last_sequence > rollbackSeqno) {
2312        errCode = couchstore_rewind_db_header(newdb.getDb());
2313        if (errCode != COUCHSTORE_SUCCESS) {
2314            // rewind_db_header cleans up (frees DB) on error; so
2315            // release db in DbHolder to prevent a double-free.
2316            newdb.releaseDb();
2317            LOG(EXTENSION_LOG_WARNING,
2318                    "Failed to rewind Db pointer "
2319                    "for couch file with vbid: %u, whose "
2320                    "lastSeqno: %" PRIu64 ", while trying to roll back "
2321                    "to seqNo: %" PRIu64 ", error=%s [%s]",
2322                    vbid, latestSeqno, rollbackSeqno,
2323                    couchstore_strerror(errCode), cb_strerror().c_str());
2324            //Reset the vbucket and send the entire snapshot,
2325            //as a previous header wasn't found.
2326            return RollbackResult(false, 0, 0, 0);
2327        }
2328        errCode = couchstore_db_info(newdb.getDb(), &info);
2329        if (errCode != COUCHSTORE_SUCCESS) {
2330            LOG(EXTENSION_LOG_WARNING,
2331                "Failed to read DB info, name=%s",
2332                dbFileName.str().c_str());
2333            return RollbackResult(false, 0, 0, 0);
2334        }
2335    }
2336
2337    //Count from latest seq no to rollback seq no
2338    uint64_t rollbackSeqCount = 0;
2339    errCode = couchstore_changes_count(db.getDb(), info.last_sequence, latestSeqno,
2340                                       &rollbackSeqCount);
2341    if (errCode != COUCHSTORE_SUCCESS) {
2342        LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2343            "rollback vBucket = %d, rev = %" PRIu64 ", error=%s [%s]",
2344            vbid, fileRev, couchstore_strerror(errCode), cb_strerror().c_str());
2345        return RollbackResult(false, 0, 0, 0);
2346    }
2347
2348    if ((totSeqCount / 2) <= rollbackSeqCount) {
2349        //doresetVbucket flag set or rollback is greater than 50%,
2350        //reset the vbucket and send the entire snapshot
2351        return RollbackResult(false, 0, 0, 0);
2352    }
2353
2354    cb->setDbHeader(newdb.getDb());
2355    std::shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
2356    ScanContext* ctx = initScanContext(cb, cl, vbid, info.last_sequence+1,
2357                                       DocumentFilter::ALL_ITEMS,
2358                                       ValueFilter::KEYS_ONLY);
2359    scan_error_t error = scan(ctx);
2360    destroyScanContext(ctx);
2361
2362    if (error != scan_success) {
2363        return RollbackResult(false, 0, 0, 0);
2364    }
2365
2366    readVBState(newdb.getDb(), vbid);
2367    cachedDeleteCount[vbid] = info.deleted_count;
2368    cachedDocCount[vbid] = info.doc_count;
2369
2370    //Append the rewinded header to the database file
2371    errCode = couchstore_commit(newdb.getDb());
2372
2373    if (errCode != COUCHSTORE_SUCCESS) {
2374        return RollbackResult(false, 0, 0, 0);
2375    }
2376
2377    vbucket_state *vb_state = cachedVBStates[vbid];
2378    return RollbackResult(true, vb_state->highSeqno,
2379                          vb_state->lastSnapStart, vb_state->lastSnapEnd);
2380}
2381
2382int populateAllKeys(Db *db, DocInfo *docinfo, void *ctx) {
2383    AllKeysCtx *allKeysCtx = (AllKeysCtx *)ctx;
2384    uint16_t keylen = docinfo->id.size;
2385    char *key = docinfo->id.buf;
2386    (allKeysCtx->cb)->callback(keylen, key);
2387    if (--(allKeysCtx->count) <= 0) {
2388        //Only when count met is less than the actual number of entries
2389        return COUCHSTORE_ERROR_CANCEL;
2390    }
2391    return COUCHSTORE_SUCCESS;
2392}
2393
2394ENGINE_ERROR_CODE
2395CouchKVStore::getAllKeys(uint16_t vbid, std::string &start_key, uint32_t count,
2396                         std::shared_ptr<Callback<uint16_t&, char*&> > cb) {
2397    Db *db = NULL;
2398    uint64_t rev = dbFileRevMap[vbid];
2399    couchstore_error_t errCode = openDB(vbid, rev, &db,
2400                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2401    if(errCode == COUCHSTORE_SUCCESS) {
2402        sized_buf ref = {NULL, 0};
2403        ref.buf = (char*) start_key.c_str();
2404        ref.size = start_key.size();
2405        AllKeysCtx ctx(cb, count);
2406        errCode = couchstore_all_docs(db, &ref, COUCHSTORE_NO_DELETES,
2407                                      populateAllKeys,
2408                                      static_cast<void *>(&ctx));
2409        closeDatabaseHandle(db);
2410        if (errCode == COUCHSTORE_SUCCESS ||
2411                errCode == COUCHSTORE_ERROR_CANCEL)  {
2412            return ENGINE_SUCCESS;
2413        } else {
2414            LOG(EXTENSION_LOG_WARNING, "couchstore_all_docs failed for "
2415                "database file of vbucket = %d rev = %" PRIu64
2416                ", error=%s [%s]", vbid, rev, couchstore_strerror(errCode),
2417                cb_strerror().c_str());
2418        }
2419    } else {
2420        LOG(EXTENSION_LOG_WARNING, "Failed to open database file for "
2421                "vbucket = %d rev = %" PRIu64 ", errCode = %u", vbid, rev, errCode);
2422
2423    }
2424    return ENGINE_FAILED;
2425}
2426
2427void CouchKVStore::unlinkCouchFile(uint16_t vbucket,
2428                                   uint64_t fRev) {
2429
2430    if (isReadOnly()) {
2431        throw std::logic_error("CouchKVStore::unlinkCouchFile: Not valid on a "
2432                "read-only object.");
2433    }
2434    char fname[PATH_MAX];
2435    try {
2436        checked_snprintf(fname, sizeof(fname), "%s/%d.couch.%" PRIu64,
2437                         dbname.c_str(), vbucket, fRev);
2438    } catch (std::exception& error) {
2439        LOG(EXTENSION_LOG_WARNING,
2440            "CouchKVStore::unlinkCouchFile: Failed to build filename: %s",
2441            fname);
2442        return;
2443    }
2444
2445    if (remove(fname) == -1) {
2446        LOG(EXTENSION_LOG_WARNING, "Failed to remove database file for "
2447            "vbucket = %d rev = %" PRIu64 ", errCode = %u", vbucket, fRev,
2448            errno);
2449
2450        if (errno != ENOENT) {
2451            std::string file_str = fname;
2452            pendingFileDeletions.push(file_str);
2453        }
2454    }
2455}
2456
2457void CouchKVStore::removeCompactFile(const std::string &dbname,
2458                                     uint16_t vbid,
2459                                     uint64_t fileRev) {
2460
2461    std::string dbfile = getDBFileName(dbname, vbid, fileRev);
2462    std::string compact_file = dbfile + ".compact";
2463
2464    if (!isReadOnly()) {
2465        removeCompactFile(compact_file);
2466    } else {
2467        LOG(EXTENSION_LOG_WARNING,
2468            "A read-only instance of the underlying store was not allowed "
2469            "to delete a temporary file: %s", compact_file.c_str());
2470    }
2471}
2472
2473void CouchKVStore::removeCompactFile(const std::string &filename) {
2474    if (isReadOnly()) {
2475        throw std::logic_error("CouchKVStore::removeCompactFile: Not valid on "
2476                "a read-only object.");
2477    }
2478
2479    if (access(filename.c_str(), F_OK) == 0) {
2480        if (remove(filename.c_str()) == 0) {
2481            LOG(EXTENSION_LOG_WARNING,
2482                "Removed compact file '%s'", filename.c_str());
2483        }
2484        else {
2485            LOG(EXTENSION_LOG_WARNING,
2486                "Warning: Failed to remove compact file '%s': %s",
2487                filename.c_str(), cb_strerror().c_str());
2488
2489            if (errno != ENOENT) {
2490                pendingFileDeletions.push(const_cast<std::string &>(filename));
2491            }
2492        }
2493    }
2494}
2495
2496/* end of couch-kvstore.cc */
2497