1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2012 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#ifdef _MSC_VER
21#define PATH_MAX MAX_PATH
22#include <direct.h>
23#define mkdir(a, b) _mkdir(a)
24#endif
25
26#include <fcntl.h>
27#include <stdio.h>
28#include <string.h>
29#include <sys/stat.h>
30#include <sys/types.h>
31
32#include <algorithm>
33#include <cctype>
34#include <cstdlib>
35#include <fstream>
36#include <iostream>
37#include <list>
38#include <map>
39#include <string>
40#include <utility>
41#include <vector>
42#include <platform/dirutils.h>
43#include <cJSON.h>
44
45#include "common.h"
46#include "couch-kvstore/couch-kvstore.h"
47#define STATWRITER_NAMESPACE couchstore_engine
48#include "statwriter.h"
49#undef STATWRITER_NAMESPACE
50
51#include <JSON_checker.h>
52#include <snappy-c.h>
53
54using namespace CouchbaseDirectoryUtilities;
55
56static const int MUTATION_FAILED = -1;
57static const int DOC_NOT_FOUND = 0;
58static const int MUTATION_SUCCESS = 1;
59
60static const int MAX_OPEN_DB_RETRY = 10;
61
62static const uint32_t DEFAULT_META_LEN = 16;
63
64class NoLookupCallback : public Callback<CacheLookup> {
65public:
66    NoLookupCallback() {}
67    ~NoLookupCallback() {}
68    void callback(CacheLookup&) {}
69};
70
71class NoRangeCallback : public Callback<SeqnoRange> {
72public:
73    NoRangeCallback() {}
74    ~NoRangeCallback() {}
75    void callback(SeqnoRange&) {}
76};
77
78extern "C" {
79    static int recordDbDumpC(Db *db, DocInfo *docinfo, void *ctx)
80    {
81        return CouchKVStore::recordDbDump(db, docinfo, ctx);
82    }
83}
84
85extern "C" {
86    static int getMultiCbC(Db *db, DocInfo *docinfo, void *ctx)
87    {
88        return CouchKVStore::getMultiCb(db, docinfo, ctx);
89    }
90}
91
92static std::string getStrError(Db *db) {
93    const size_t max_msg_len = 256;
94    char msg[max_msg_len];
95    couchstore_last_os_error(db, msg, max_msg_len);
96    std::string errorStr(msg);
97    return errorStr;
98}
99
100static std::string getSystemStrerror(void) {
101    std::stringstream ss;
102#ifdef WIN32
103    char* win_msg = NULL;
104    DWORD err = GetLastError();
105    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
106                   FORMAT_MESSAGE_FROM_SYSTEM |
107                   FORMAT_MESSAGE_IGNORE_INSERTS,
108                   NULL, err,
109                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
110                   (LPTSTR) &win_msg,
111                   0, NULL);
112    ss << "errno = " << err << ": '" << win_msg << "'";
113    LocalFree(win_msg);
114#else
115    ss << "errno = " << errno << ": '" << strerror(errno) << "'";
116#endif
117
118    return ss.str();
119}
120
121static uint8_t determine_datatype(const unsigned char* value,
122                                  size_t length) {
123    if (checkUTF8JSON(value, length)) {
124        return PROTOCOL_BINARY_DATATYPE_JSON;
125    } else {
126        return PROTOCOL_BINARY_RAW_BYTES;
127    }
128}
129
130static const std::string getJSONObjString(const cJSON *i) {
131    if (i == NULL) {
132        return "";
133    }
134    if (i->type != cJSON_String) {
135        abort();
136    }
137    return i->valuestring;
138}
139
140static bool endWithCompact(const std::string &filename) {
141    size_t pos = filename.find(".compact");
142    if (pos == std::string::npos ||
143                        (filename.size() - sizeof(".compact")) != pos) {
144        return false;
145    }
146    return true;
147}
148
149static void discoverDbFiles(const std::string &dir,
150                            std::vector<std::string> &v) {
151    std::vector<std::string> files = findFilesContaining(dir, ".couch");
152    std::vector<std::string>::iterator ii;
153    for (ii = files.begin(); ii != files.end(); ++ii) {
154        if (!endWithCompact(*ii)) {
155            v.push_back(*ii);
156        }
157    }
158}
159
160static uint64_t computeMaxDeletedSeqNum(DocInfo **docinfos,
161                                        const int numdocs) {
162    uint64_t max = 0;
163    for (int idx = 0; idx < numdocs; idx++) {
164        if (docinfos[idx]->deleted) {
165            // check seq number only from a deleted file
166            uint64_t seqNum = docinfos[idx]->rev_seq;
167            max = std::max(seqNum, max);
168        }
169    }
170    return max;
171}
172
173static int getMutationStatus(couchstore_error_t errCode) {
174    switch (errCode) {
175    case COUCHSTORE_SUCCESS:
176        return MUTATION_SUCCESS;
177    case COUCHSTORE_ERROR_NO_HEADER:
178    case COUCHSTORE_ERROR_NO_SUCH_FILE:
179    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
180        // this return causes ep engine to drop the failed flush
181        // of an item since it does not know about the itme any longer
182        return DOC_NOT_FOUND;
183    default:
184        // this return causes ep engine to keep requeuing the failed
185        // flush of an item
186        return MUTATION_FAILED;
187    }
188}
189
190static bool allDigit(std::string &input) {
191    size_t numchar = input.length();
192    for(size_t i = 0; i < numchar; ++i) {
193        if (!isdigit(input[i])) {
194            return false;
195        }
196    }
197    return true;
198}
199
200static std::string couchkvstore_strerrno(Db *db, couchstore_error_t err) {
201    return (err == COUCHSTORE_ERROR_OPEN_FILE ||
202            err == COUCHSTORE_ERROR_READ ||
203            err == COUCHSTORE_ERROR_WRITE) ? getStrError(db) : "none";
204}
205
206struct GetMultiCbCtx {
207    GetMultiCbCtx(CouchKVStore &c, uint16_t v, vb_bgfetch_queue_t &f) :
208        cks(c), vbId(v), fetches(f) {}
209
210    CouchKVStore &cks;
211    uint16_t vbId;
212    vb_bgfetch_queue_t &fetches;
213};
214
215struct StatResponseCtx {
216public:
217    StatResponseCtx(std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &sm,
218                    uint16_t vb) : statMap(sm), vbId(vb) {
219        /* EMPTY */
220    }
221
222    std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &statMap;
223    uint16_t vbId;
224};
225
226struct LoadResponseCtx {
227    shared_ptr<Callback<GetValue> > callback;
228    shared_ptr<Callback<CacheLookup> > lookup;
229    uint16_t vbucketId;
230    bool keysonly;
231    EPStats *stats;
232};
233
234struct AllKeysCtx {
235    AllKeysCtx(AllKeysCB *callback, uint32_t cnt) :
236        cb(callback), count(cnt) { }
237
238    AllKeysCB *cb;
239    uint32_t count;
240};
241
242CouchRequest::CouchRequest(const Item &it, uint64_t rev,
243                           CouchRequestCallback &cb, bool del) :
244    value(it.getValue()), vbucketId(it.getVBucketId()), fileRevNum(rev),
245    key(it.getKey()), deleteItem(del)
246{
247    uint64_t cas = htonll(it.getCas());
248    uint32_t flags = it.getFlags();
249    uint32_t vlen = it.getNBytes();
250    uint32_t exptime = it.getExptime();
251
252    // Datatype used to determine whether document requires compression or not
253    uint8_t datatype;
254
255    // Save time of deletion in expiry time field of deleted item's metadata.
256    if (del) {
257        exptime = ep_real_time();
258    }
259    exptime = htonl(exptime);
260
261    dbDoc.id.buf = const_cast<char *>(key.c_str());
262    dbDoc.id.size = it.getNKey();
263    if (vlen) {
264        dbDoc.data.buf = const_cast<char *>(value->getData());
265        dbDoc.data.size = vlen;
266        datatype = it.getDataType();
267    } else {
268        dbDoc.data.buf = NULL;
269        dbDoc.data.size = 0;
270        datatype = 0x00;
271    }
272
273    memset(meta, 0, sizeof(meta));
274    memcpy(meta, &cas, 8);
275    memcpy(meta + 8, &exptime, 4);
276    memcpy(meta + 12, &flags, 4);
277    *(meta + DEFAULT_META_LEN) = FLEX_META_CODE;
278    memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET, it.getExtMeta(),
279           it.getExtMetaLen());
280
281    dbDocInfo.db_seq = it.getBySeqno();
282    dbDocInfo.rev_meta.buf = reinterpret_cast<char *>(meta);
283    dbDocInfo.rev_meta.size = COUCHSTORE_METADATA_SIZE;
284    dbDocInfo.rev_seq = it.getRevSeqno();
285    dbDocInfo.size = dbDoc.data.size;
286    if (del) {
287        dbDocInfo.deleted =  1;
288        callback.delCb = cb.delCb;
289    } else {
290        dbDocInfo.deleted = 0;
291        callback.setCb = cb.setCb;
292    }
293    dbDocInfo.id = dbDoc.id;
294    dbDocInfo.content_meta = (datatype == PROTOCOL_BINARY_DATATYPE_JSON) ?
295                                    COUCH_DOC_IS_JSON : COUCH_DOC_NON_JSON_MODE;
296
297    //Compress only those documents that aren't already compressed.
298    if (dbDoc.data.size > 0 && !deleteItem) {
299        if (datatype == PROTOCOL_BINARY_RAW_BYTES ||
300                datatype == PROTOCOL_BINARY_DATATYPE_JSON) {
301            dbDocInfo.content_meta |= COUCH_DOC_IS_COMPRESSED;
302        }
303    }
304    start = gethrtime();
305}
306
307CouchKVStore::CouchKVStore(EPStats &stats, Configuration &config, bool read_only) :
308    KVStore(read_only), epStats(stats), configuration(config),
309    dbname(configuration.getDbname()), intransaction(false),
310    dbFileRevMapPopulated(false)
311{
312    open();
313    statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
314
315    // init db file map with default revision number, 1
316    numDbFiles = static_cast<uint16_t>(configuration.getMaxVbuckets());
317    cachedVBStates.reserve(numDbFiles);
318    for (uint16_t i = 0; i < numDbFiles; i++) {
319        // pre-allocate to avoid rehashing for safe read-only operations
320        dbFileRevMap.push_back(1);
321        cachedDocCount[i] = (size_t)-1;
322        cachedDeleteCount[i] = (size_t)-1;
323        cachedVBStates.push_back((vbucket_state *)NULL);
324    }
325}
326
327CouchKVStore::CouchKVStore(const CouchKVStore &copyFrom) :
328    KVStore(copyFrom), epStats(copyFrom.epStats),
329    configuration(copyFrom.configuration),
330    dbname(copyFrom.dbname), dbFileRevMap(copyFrom.dbFileRevMap),
331    numDbFiles(copyFrom.numDbFiles),
332    intransaction(false),
333    dbFileRevMapPopulated(copyFrom.dbFileRevMapPopulated)
334{
335    open();
336    statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
337}
338
339CouchKVStore::~CouchKVStore() {
340    close();
341
342    for (std::vector<vbucket_state *>::iterator it = cachedVBStates.begin();
343         it != cachedVBStates.end(); it++) {
344        vbucket_state *vbstate = *it;
345        if (vbstate) {
346            delete vbstate;
347            *it = NULL;
348        }
349    }
350
351}
352void CouchKVStore::reset(uint16_t vbucketId) {
353    cb_assert(!isReadOnly());
354
355    vbucket_state *state = cachedVBStates[vbucketId];
356    if (state) {
357        state->checkpointId = 0;
358        state->maxDeletedSeqno = 0;
359        state->highSeqno = 0;
360        state->lastSnapStart = 0;
361        state->lastSnapEnd = 0;
362
363        // Unlink the couchstore file upon reset
364        unlinkCouchFile(vbucketId, dbFileRevMap[vbucketId]);
365
366        resetVBucket(vbucketId, *state);
367        updateDbFileMap(vbucketId, 1);
368    } else {
369        LOG(EXTENSION_LOG_WARNING, "No entry in cached states "
370                "for vbucket %u", vbucketId);
371        cb_assert(false);
372    }
373}
374
375void CouchKVStore::set(const Item &itm, Callback<mutation_result> &cb) {
376    cb_assert(!isReadOnly());
377    cb_assert(intransaction);
378    bool deleteItem = false;
379    CouchRequestCallback requestcb;
380    std::string dbFile;
381    uint64_t fileRev = dbFileRevMap[itm.getVBucketId()];
382
383    // each req will be de-allocated after commit
384    requestcb.setCb = &cb;
385    CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, deleteItem);
386    pendingReqsQ.push_back(req);
387}
388
389void CouchKVStore::get(const std::string &key, uint64_t, uint16_t vb,
390                       Callback<GetValue> &cb, bool fetchDelete) {
391    Db *db = NULL;
392    std::string dbFile;
393    GetValue rv;
394    uint64_t fileRev = dbFileRevMap[vb];
395
396    couchstore_error_t errCode = openDB(vb, fileRev, &db,
397                                        COUCHSTORE_OPEN_FLAG_RDONLY);
398    if (errCode != COUCHSTORE_SUCCESS) {
399        ++st.numGetFailure;
400        LOG(EXTENSION_LOG_WARNING,
401            "Warning: failed to open database to retrieve data "
402            "from vBucketId = %d, key = %s, file = %s\n",
403            vb, key.c_str(), dbFile.c_str());
404        rv.setStatus(couchErr2EngineErr(errCode));
405        cb.callback(rv);
406        return;
407    }
408
409    getWithHeader(db, key, vb, cb, fetchDelete);
410    closeDatabaseHandle(db);
411}
412
413void CouchKVStore::getWithHeader(void *dbHandle, const std::string &key,
414                                 uint16_t vb, Callback<GetValue> &cb,
415                                 bool fetchDelete) {
416
417    Db *db = (Db *)dbHandle;
418    hrtime_t start = gethrtime();
419    RememberingCallback<GetValue> *rc = dynamic_cast<RememberingCallback<GetValue> *>(&cb);
420    bool getMetaOnly = rc && rc->val.isPartial();
421    DocInfo *docInfo = NULL;
422    sized_buf id;
423    GetValue rv;
424    std::string dbFile;
425
426    id.size = key.size();
427    id.buf = const_cast<char *>(key.c_str());
428    couchstore_error_t errCode = couchstore_docinfo_by_id(db, (uint8_t *)id.buf,
429                                                          id.size, &docInfo);
430    if (errCode != COUCHSTORE_SUCCESS) {
431        if (!getMetaOnly) {
432            // log error only if this is non-xdcr case
433            LOG(EXTENSION_LOG_WARNING,
434                "Warning: failed to retrieve doc info from "
435                "database, name=%s key=%s error=%s [%s]\n",
436                dbFile.c_str(), id.buf, couchstore_strerror(errCode),
437                couchkvstore_strerrno(db, errCode).c_str());
438        }
439    } else {
440        cb_assert(docInfo);
441        errCode = fetchDoc(db, docInfo, rv, vb, getMetaOnly, fetchDelete);
442        if (errCode != COUCHSTORE_SUCCESS) {
443            LOG(EXTENSION_LOG_WARNING,
444                "Warning: failed to retrieve key value from "
445                "database, name=%s key=%s error=%s [%s] "
446                "deleted=%s", dbFile.c_str(), id.buf,
447                couchstore_strerror(errCode),
448                couchkvstore_strerrno(db, errCode).c_str(),
449                docInfo->deleted ? "yes" : "no");
450        }
451
452        // record stats
453        st.readTimeHisto.add((gethrtime() - start) / 1000);
454        if (errCode == COUCHSTORE_SUCCESS) {
455            st.readSizeHisto.add(key.length() + rv.getValue()->getNBytes());
456        }
457    }
458
459    if(errCode != COUCHSTORE_SUCCESS) {
460        ++st.numGetFailure;
461    }
462
463    couchstore_free_docinfo(docInfo);
464    rv.setStatus(couchErr2EngineErr(errCode));
465    cb.callback(rv);
466}
467
468void CouchKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t &itms) {
469    std::string dbFile;
470    int numItems = itms.size();
471    uint64_t fileRev = dbFileRevMap[vb];
472
473    Db *db = NULL;
474    couchstore_error_t errCode = openDB(vb, fileRev, &db,
475                                        COUCHSTORE_OPEN_FLAG_RDONLY);
476    if (errCode != COUCHSTORE_SUCCESS) {
477        LOG(EXTENSION_LOG_WARNING,
478            "Warning: failed to open database for data fetch, "
479            "vBucketId = %d file = %s numDocs = %d\n",
480            vb, dbFile.c_str(), numItems);
481        st.numGetFailure.fetch_add(numItems);
482        vb_bgfetch_queue_t::iterator itr = itms.begin();
483        for (; itr != itms.end(); ++itr) {
484            std::list<VBucketBGFetchItem *> &fetches = (*itr).second;
485            std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
486            for (; fitr != fetches.end(); ++fitr) {
487                (*fitr)->value.setStatus(ENGINE_NOT_MY_VBUCKET);
488            }
489        }
490        return;
491    }
492
493    size_t idx = 0;
494    sized_buf *ids = new sized_buf[itms.size()];
495    vb_bgfetch_queue_t::iterator itr = itms.begin();
496    for (; itr != itms.end(); ++itr) {
497        ids[idx].size = itr->first.size();
498        ids[idx].buf = const_cast<char *>(itr->first.c_str());
499        ++idx;
500    }
501
502    GetMultiCbCtx ctx(*this, vb, itms);
503    errCode = couchstore_docinfos_by_id(db, ids, itms.size(),
504                                        0, getMultiCbC, &ctx);
505    if (errCode != COUCHSTORE_SUCCESS) {
506        st.numGetFailure.fetch_add(numItems);
507        for (itr = itms.begin(); itr != itms.end(); ++itr) {
508            LOG(EXTENSION_LOG_WARNING, "Warning: failed to read database by"
509                " vBucketId = %d key = %s file = %s error = %s [%s]\n",
510                vb, (*itr).first.c_str(),
511                dbFile.c_str(), couchstore_strerror(errCode),
512                couchkvstore_strerrno(db, errCode).c_str());
513            std::list<VBucketBGFetchItem *> &fetches = (*itr).second;
514            std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
515            for (; fitr != fetches.end(); ++fitr) {
516                (*fitr)->value.setStatus(couchErr2EngineErr(errCode));
517            }
518        }
519    }
520    closeDatabaseHandle(db);
521    delete []ids;
522}
523
524void CouchKVStore::del(const Item &itm,
525                       Callback<int> &cb) {
526    cb_assert(!isReadOnly());
527    cb_assert(intransaction);
528    uint16_t fileRev = dbFileRevMap[itm.getVBucketId()];
529    CouchRequestCallback requestcb;
530    requestcb.delCb = &cb;
531    CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, true);
532    pendingReqsQ.push_back(req);
533}
534
535void CouchKVStore::delVBucket(uint16_t vbucket) {
536    cb_assert(!isReadOnly());
537
538    unlinkCouchFile(vbucket, dbFileRevMap[vbucket]);
539
540    if (cachedVBStates[vbucket]) {
541        delete cachedVBStates[vbucket];
542    }
543
544    std::string failovers("[{\"id\":0, \"seq\":0}]");
545    cachedVBStates[vbucket] = new vbucket_state(vbucket_state_dead, 0, 0, 0, 0,
546                                                0, 0, failovers);
547    updateDbFileMap(vbucket, 1);
548}
549
550std::vector<vbucket_state *> CouchKVStore::listPersistedVbuckets() {
551    std::vector<std::string> files;
552    std::vector<uint16_t> vbids;
553
554    // warmup, first discover db files from local directory
555    discoverDbFiles(dbname, files);
556    populateFileNameMap(files, &vbids);
557
558    for (std::vector<vbucket_state *>::iterator it = cachedVBStates.begin();
559         it != cachedVBStates.end(); it++) {
560        vbucket_state *vbstate = *it;
561        if (vbstate) {
562            delete vbstate;
563            *it = NULL;
564        }
565    }
566
567    Db *db = NULL;
568    couchstore_error_t errorCode;
569
570    std::vector<uint16_t>::iterator itr = vbids.begin();
571    for (; itr != vbids.end(); ++itr) {
572        uint16_t id = *itr;
573        uint64_t rev = dbFileRevMap[id];
574
575        errorCode = openDB(id, rev, &db, COUCHSTORE_OPEN_FLAG_RDONLY);
576        if (errorCode == COUCHSTORE_SUCCESS) {
577            readVBState(db, id);
578            /* update stat */
579            ++st.numLoadedVb;
580            closeDatabaseHandle(db);
581        } else {
582            std::stringstream revnum, vbid;
583            revnum  << rev;
584            vbid << id;
585            std::string fileName =
586                dbname + "/" + vbid.str() + ".couch." + revnum.str();
587            LOG(EXTENSION_LOG_WARNING,
588                "WARN: failed to open database file, name=%s\n",
589                fileName.c_str());
590            remVBucketFromDbFileMap(id);
591        }
592
593        db = NULL;
594        removeCompactFile(dbname, id, rev);
595    }
596
597    return cachedVBStates;
598}
599
600void CouchKVStore::getPersistedStats(std::map<std::string,
601                                     std::string> &stats) {
602    char *buffer = NULL;
603    std::string fname = dbname + "/stats.json";
604    if (access(fname.c_str(), R_OK) == -1) {
605        return ;
606    }
607
608    std::ifstream session_stats;
609    session_stats.exceptions (session_stats.failbit | session_stats.badbit);
610    try {
611        session_stats.open(fname.c_str(), std::ios::binary);
612        session_stats.seekg(0, std::ios::end);
613        int flen = session_stats.tellg();
614        if (flen < 0) {
615            LOG(EXTENSION_LOG_WARNING,
616                "Warning: error in session stats ifstream!!!");
617            session_stats.close();
618            return;
619        }
620        session_stats.seekg(0, std::ios::beg);
621        buffer = new char[flen + 1];
622        session_stats.read(buffer, flen);
623        session_stats.close();
624        buffer[flen] = '\0';
625
626        cJSON *json_obj = cJSON_Parse(buffer);
627        if (!json_obj) {
628            LOG(EXTENSION_LOG_WARNING,
629                "Warning: failed to parse the session stats json doc!!!");
630            delete[] buffer;
631            return;
632        }
633
634        int json_arr_size = cJSON_GetArraySize(json_obj);
635        for (int i = 0; i < json_arr_size; ++i) {
636            cJSON *obj = cJSON_GetArrayItem(json_obj, i);
637            if (obj) {
638                stats[obj->string] = obj->valuestring ? obj->valuestring : "";
639            }
640        }
641        cJSON_Delete(json_obj);
642
643    } catch (const std::ifstream::failure &e) {
644        LOG(EXTENSION_LOG_WARNING,
645            "Warning: failed to load the engine session stats "
646            " due to IO exception \"%s\"", e.what());
647    } catch (...) {
648        LOG(EXTENSION_LOG_WARNING,
649            "Warning: failed to load the engine session stats "
650            " due to IO exception");
651    }
652
653    delete[] buffer;
654}
655
656static std::string getDBFileName(const std::string &dbname,
657                                 uint16_t vbid,
658                                 uint64_t rev) {
659    std::stringstream ss;
660    ss << dbname << "/" << vbid << ".couch." << rev;
661    return ss.str();
662}
663
664static int edit_docinfo_hook(DocInfo **info, const sized_buf *item) {
665    if ((*info)->rev_meta.size == DEFAULT_META_LEN) {
666        const unsigned char* data;
667        bool ret;
668        if (((*info)->content_meta | COUCH_DOC_IS_COMPRESSED) ==
669                (*info)->content_meta) {
670            size_t uncompr_len;
671            snappy_uncompressed_length(item->buf, item->size, &uncompr_len);
672            char *dbuf = (char *) malloc(uncompr_len);
673            snappy_uncompress(item->buf, item->size, dbuf, &uncompr_len);
674            data = (const unsigned char*)dbuf;
675            ret = checkUTF8JSON(data, uncompr_len);
676            free(dbuf);
677        } else {
678            data = (const unsigned char*)item->buf;
679            ret = checkUTF8JSON(data, item->size);
680        }
681        uint8_t flex_code = FLEX_META_CODE;
682        uint8_t datatype;
683        if (ret) {
684            datatype = PROTOCOL_BINARY_DATATYPE_JSON;
685        } else {
686            datatype = PROTOCOL_BINARY_RAW_BYTES;
687        }
688
689        DocInfo *docinfo = (DocInfo *) calloc (sizeof(DocInfo) +
690                                               (*info)->id.size +
691                                               (*info)->rev_meta.size +
692                                               FLEX_DATA_OFFSET + EXT_META_LEN,
693                                               sizeof(uint8_t));
694        if (!docinfo) {
695            LOG(EXTENSION_LOG_WARNING, "Failed to allocate docInfo, "
696                    "while editing docinfo in the compaction's docinfo_hook");
697            return 0;
698        }
699
700        char *extra = (char *)docinfo + sizeof(DocInfo);
701        memcpy(extra, (*info)->id.buf, (*info)->id.size);
702        docinfo->id.buf = extra;
703        docinfo->id.size = (*info)->id.size;
704
705        extra += (*info)->id.size;
706        memcpy(extra, (*info)->rev_meta.buf, (*info)->rev_meta.size);
707        memcpy(extra + (*info)->rev_meta.size,
708               &flex_code, FLEX_DATA_OFFSET);
709        memcpy(extra + (*info)->rev_meta.size + FLEX_DATA_OFFSET,
710               &datatype, sizeof(uint8_t));
711        docinfo->rev_meta.buf = extra;
712        docinfo->rev_meta.size = (*info)->rev_meta.size +
713                                 FLEX_DATA_OFFSET + EXT_META_LEN;
714
715        docinfo->db_seq = (*info)->db_seq;
716        docinfo->rev_seq = (*info)->rev_seq;
717        docinfo->deleted = (*info)->deleted;
718        docinfo->content_meta = (*info)->content_meta;
719        docinfo->bp = (*info)->bp;
720        docinfo->size = (*info)->size;
721
722        couchstore_free_docinfo(*info);
723        *info = docinfo;
724        return 1;
725    }
726    return 0;
727}
728
729static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
730    compaction_ctx* ctx = (compaction_ctx*) ctx_p;
731    DbInfo infoDb;
732
733    couchstore_db_info(d, &infoDb);
734    //Compaction finished
735    if (info == NULL) {
736        return couchstore_set_purge_seq(d, ctx->max_purged_seq);
737    }
738
739    if (info->rev_meta.size >= DEFAULT_META_LEN) {
740        uint32_t exptime;
741        memcpy(&exptime, info->rev_meta.buf + 8, 4);
742        exptime = ntohl(exptime);
743        if (info->deleted) {
744            if (info->db_seq != infoDb.last_sequence) {
745                if (ctx->drop_deletes) { // all deleted items must be dropped ...
746                    if (ctx->max_purged_seq < info->db_seq) {
747                        ctx->max_purged_seq = info->db_seq; // track max_purged_seq
748                    }
749                    return COUCHSTORE_COMPACT_DROP_ITEM;      // ...unconditionally
750                }
751                if (exptime < ctx->purge_before_ts &&
752                        (!ctx->purge_before_seq ||
753                         info->db_seq <= ctx->purge_before_seq)) {
754                    if (ctx->max_purged_seq < info->db_seq) {
755                        ctx->max_purged_seq = info->db_seq;
756                    }
757                    return COUCHSTORE_COMPACT_DROP_ITEM;
758                }
759            }
760        } else if (exptime && exptime < ctx->curr_time) {
761            std::string keyStr(info->id.buf, info->id.size);
762            expiredItemCtx expItem = { info->rev_seq, keyStr };
763            ctx->expiredItems.push_back(expItem);
764        }
765    }
766
767    return COUCHSTORE_COMPACT_KEEP_ITEM;
768}
769
770void CouchKVStore::compactVBucket(const uint16_t vbid,
771                                  compaction_ctx *hook_ctx,
772                                  Callback<compaction_ctx> &cb,
773                                  Callback<kvstats_ctx> &kvcb) {
774    couchstore_compact_hook       hook = time_purge_hook;
775    couchstore_docinfo_hook      dhook = edit_docinfo_hook;
776    const couch_file_ops     *def_iops = couchstore_get_default_file_ops();
777    Db                      *compactdb = NULL;
778    Db                       *targetDb = NULL;
779    uint64_t                   fileRev = dbFileRevMap[vbid];
780    uint64_t                   new_rev = fileRev + 1;
781    couchstore_error_t         errCode = COUCHSTORE_SUCCESS;
782    hrtime_t                     start = gethrtime();
783    uint64_t              newHeaderPos = 0;
784    std::string                 dbfile;
785    std::string           compact_file;
786    std::string               new_file;
787    kvstats_ctx                  kvctx;
788    DbInfo                        info;
789
790    // Open the source VBucket database file ...
791    errCode = openDB(vbid, fileRev, &compactdb,
792                     (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
793    if (errCode != COUCHSTORE_SUCCESS) {
794        LOG(EXTENSION_LOG_WARNING,
795                "Warning: failed to open database, vbucketId = %d "
796                "fileRev = %llu", vbid, fileRev);
797        return;
798    }
799
800    // Build the temporary vbucket.compact file name
801    dbfile       = getDBFileName(dbname, vbid, fileRev);
802    compact_file = dbfile + ".compact";
803
804    // Perform COMPACTION of vbucket.couch.rev into vbucket.couch.rev.compact
805    errCode = couchstore_compact_db_ex(compactdb, compact_file.c_str(), 0,
806                                       hook, dhook, hook_ctx, def_iops);
807    if (errCode != COUCHSTORE_SUCCESS) {
808        LOG(EXTENSION_LOG_WARNING,
809            "Warning: failed to compact database with name=%s "
810            "error=%s errno=%s",
811            dbfile.c_str(),
812            couchstore_strerror(errCode),
813            couchkvstore_strerrno(compactdb, errCode).c_str());
814        closeDatabaseHandle(compactdb);
815        return;
816    }
817
818    // Close the source Database File once compaction is done
819    closeDatabaseHandle(compactdb);
820
821    // Rename the .compact file to one with the next revision number
822    new_file = getDBFileName(dbname, vbid, new_rev);
823    if (rename(compact_file.c_str(), new_file.c_str()) != 0) {
824        LOG(EXTENSION_LOG_WARNING,
825            "Warning: failed to rename '%s' to '%s': %s",
826            compact_file.c_str(), new_file.c_str(),
827            getSystemStrerror().c_str());
828
829        removeCompactFile(compact_file);
830        return;
831    }
832
833    // Open the newly compacted VBucket database file ...
834    errCode = openDB(vbid, new_rev, &targetDb,
835                     (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
836    if (errCode != COUCHSTORE_SUCCESS) {
837        LOG(EXTENSION_LOG_WARNING,
838                "Warning: failed to open compacted database file %s "
839                "fileRev = %llu", new_file.c_str(), new_rev);
840        if (remove(new_file.c_str()) != 0) {
841            LOG(EXTENSION_LOG_WARNING, NULL,
842                "Warning: Failed to remove '%s': %s",
843                new_file.c_str(), getSystemStrerror().c_str());
844        }
845        return;
846    }
847
848    // Update the global VBucket file map so all operations use the new file
849    updateDbFileMap(vbid, new_rev);
850
851    LOG(EXTENSION_LOG_INFO,
852            "INFO: created new couch db file, name=%s rev=%llu",
853            new_file.c_str(), new_rev);
854
855    // Update stats to caller
856    kvctx.vbucket = vbid;
857    couchstore_db_info(targetDb, &info);
858    kvctx.fileSpaceUsed = info.space_used;
859    kvctx.fileSize = info.file_size;
860    kvcb.callback(kvctx);
861
862    // also update cached state with dbinfo
863    vbucket_state *state = cachedVBStates[vbid];
864    if (state) {
865        state->highSeqno = info.last_sequence;
866        state->purgeSeqno = info.purge_seq;
867        cachedDeleteCount[vbid] = info.deleted_count;
868        cachedDocCount[vbid] = info.doc_count;
869    }
870
871    // Notify MCCouch that compaction is Done...
872    newHeaderPos = couchstore_get_header_position(targetDb);
873    closeDatabaseHandle(targetDb);
874
875    if (hook_ctx->expiredItems.size()) {
876        cb.callback(*hook_ctx);
877    }
878
879    // Removing the stale couch file
880    unlinkCouchFile(vbid, fileRev);
881
882    st.compactHisto.add((gethrtime() - start) / 1000);
883}
884
885bool CouchKVStore::snapshotVBucket(uint16_t vbucketId, vbucket_state &vbstate,
886                                   Callback<kvstats_ctx> *cb) {
887    cb_assert(!isReadOnly());
888
889    vbucket_state *state = cachedVBStates[vbucketId];
890    if (state) {
891        if (state->state == vbstate.state &&
892            state->checkpointId == vbstate.checkpointId &&
893            state->failovers.compare(vbstate.failovers) == 0) {
894            return true; // no changes
895        }
896        state->state = vbstate.state;
897        state->checkpointId = vbstate.checkpointId;
898        state->failovers = vbstate.failovers;
899        // Note that max deleted seq number is maintained within CouchKVStore
900        vbstate.maxDeletedSeqno = state->maxDeletedSeqno;
901    } else {
902        cachedVBStates[vbucketId] = new vbucket_state(vbstate);
903    }
904
905    if (!setVBucketState(vbucketId, vbstate, cb)) {
906        LOG(EXTENSION_LOG_WARNING,
907                "Warning: failed to set new state, %s, for vbucket %d\n",
908                VBucket::toString(vbstate.state), vbucketId);
909        return false;
910    }
911    return true;
912}
913
914bool CouchKVStore::snapshotStats(const std::map<std::string,
915                                 std::string> &stats) {
916    cb_assert(!isReadOnly());
917    size_t count = 0;
918    size_t size = stats.size();
919    std::stringstream stats_buf;
920    stats_buf << "{";
921    std::map<std::string, std::string>::const_iterator it = stats.begin();
922    for (; it != stats.end(); ++it) {
923        stats_buf << "\"" << it->first << "\": \"" << it->second << "\"";
924        ++count;
925        if (count < size) {
926            stats_buf << ", ";
927        }
928    }
929    stats_buf << "}";
930
931    // TODO: This stats json should be written into the master database. However,
932    // we don't support the write synchronization between CouchKVStore in C++ and
933    // compaction manager in the erlang side for the master database yet. At this time,
934    // we simply log the engine stats into a separate json file. As part of futhre work,
935    // we need to get rid of the tight coupling between those two components.
936    bool rv = true;
937    std::string next_fname = dbname + "/stats.json.new";
938    std::ofstream new_stats;
939    new_stats.exceptions (new_stats.failbit | new_stats.badbit);
940    try {
941        new_stats.open(next_fname.c_str());
942        new_stats << stats_buf.str().c_str() << std::endl;
943        new_stats.flush();
944        new_stats.close();
945    } catch (const std::ofstream::failure& e) {
946        LOG(EXTENSION_LOG_WARNING, "Warning: failed to log the engine stats to "
947            "file \"%s\" due to IO exception \"%s\"; Not critical because new "
948            "stats will be dumped later, please ignore.",
949            next_fname.c_str(), e.what());
950        rv = false;
951    }
952
953    if (rv) {
954        std::string old_fname = dbname + "/stats.json.old";
955        std::string stats_fname = dbname + "/stats.json";
956        if (access(old_fname.c_str(), F_OK) == 0 && remove(old_fname.c_str()) != 0) {
957            LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
958                old_fname.c_str(), strerror(errno));
959            remove(next_fname.c_str());
960            rv = false;
961        } else if (access(stats_fname.c_str(), F_OK) == 0 &&
962                   rename(stats_fname.c_str(), old_fname.c_str()) != 0) {
963            LOG(EXTENSION_LOG_WARNING,
964                "Warning: failed to rename '%s' to '%s': %s",
965                stats_fname.c_str(), old_fname.c_str(), strerror(errno));
966            remove(next_fname.c_str());
967            rv = false;
968        } else if (rename(next_fname.c_str(), stats_fname.c_str()) != 0) {
969            LOG(EXTENSION_LOG_WARNING,
970                "Warning: failed to rename '%s' to '%s': %s",
971                next_fname.c_str(), stats_fname.c_str(), strerror(errno));
972            remove(next_fname.c_str());
973            rv = false;
974        }
975    }
976
977    return rv;
978}
979
980bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
981                                   Callback<kvstats_ctx> *kvcb) {
982    Db *db = NULL;
983    uint64_t fileRev, newFileRev;
984    std::stringstream id;
985    std::string dbFileName;
986    std::map<uint16_t, uint64_t>::iterator mapItr;
987    kvstats_ctx kvctx;
988    kvctx.vbucket = vbucketId;
989
990    id << vbucketId;
991    dbFileName = dbname + "/" + id.str() + ".couch." + id.str();
992    fileRev = dbFileRevMap[vbucketId];
993
994    couchstore_error_t errorCode;
995    errorCode = openDB(vbucketId, fileRev, &db,
996            (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE, &newFileRev);
997    if (errorCode != COUCHSTORE_SUCCESS) {
998        ++st.numVbSetFailure;
999        LOG(EXTENSION_LOG_WARNING,
1000                "Warning: failed to open database, name=%s",
1001                dbFileName.c_str());
1002        return false;
1003    }
1004
1005    fileRev = newFileRev;
1006
1007    vbucket_state *state = cachedVBStates[vbucketId];
1008    vbstate.highSeqno = state->highSeqno;
1009    vbstate.lastSnapStart = state->lastSnapStart;
1010    vbstate.lastSnapEnd = state->lastSnapEnd;
1011    vbstate.maxDeletedSeqno = state->maxDeletedSeqno;
1012
1013    errorCode = saveVBState(db, vbstate);
1014    if (errorCode != COUCHSTORE_SUCCESS) {
1015        ++st.numVbSetFailure;
1016        LOG(EXTENSION_LOG_WARNING,
1017                "Warning: failed to save local doc, name=%s",
1018                dbFileName.c_str());
1019        closeDatabaseHandle(db);
1020        return false;
1021    }
1022
1023    errorCode = couchstore_commit(db);
1024    if (errorCode != COUCHSTORE_SUCCESS) {
1025        ++st.numVbSetFailure;
1026        LOG(EXTENSION_LOG_WARNING,
1027                "Warning: commit failed, vbid=%u rev=%llu error=%s [%s]",
1028                vbucketId, fileRev, couchstore_strerror(errorCode),
1029                couchkvstore_strerrno(db, errorCode).c_str());
1030        closeDatabaseHandle(db);
1031        return false;
1032    }
1033    if (kvcb) {
1034        DbInfo info;
1035        couchstore_db_info(db, &info);
1036        kvctx.fileSpaceUsed = info.space_used;
1037        kvctx.fileSize = info.file_size;
1038        kvcb->callback(kvctx);
1039    }
1040    closeDatabaseHandle(db);
1041
1042    return true;
1043}
1044
1045void CouchKVStore::dump(std::vector<uint16_t> &vbids,
1046                        shared_ptr<Callback<GetValue> > cb,
1047                        shared_ptr<Callback<CacheLookup> > cl) {
1048
1049    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
1050    std::vector<uint16_t>::iterator itr = vbids.begin();
1051    for (; itr != vbids.end(); ++itr) {
1052        loadDB(cb, cl, sr, false, *itr, 0, COUCHSTORE_NO_DELETES);
1053    }
1054}
1055
1056void CouchKVStore::dump(uint16_t vb, uint64_t stSeqno,
1057                        shared_ptr<Callback<GetValue> > cb,
1058                        shared_ptr<Callback<CacheLookup> > cl,
1059                        shared_ptr<Callback<SeqnoRange> > sr) {
1060
1061    loadDB(cb, cl, sr, false, vb, stSeqno);
1062}
1063
1064void CouchKVStore::dumpKeys(std::vector<uint16_t> &vbids,
1065                            shared_ptr<Callback<GetValue> > cb) {
1066
1067    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
1068    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
1069    std::vector<uint16_t>::iterator itr = vbids.begin();
1070    for (; itr != vbids.end(); ++itr) {
1071        loadDB(cb, cl, sr, true, *itr, 0, COUCHSTORE_NO_DELETES);
1072    }
1073}
1074
1075void CouchKVStore::dumpDeleted(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno,
1076                               shared_ptr<Callback<GetValue> > cb) {
1077
1078    std::vector<uint16_t> vbids;
1079    vbids.push_back(vb);
1080    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
1081    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
1082    loadDB(cb, cl, sr, true, vb, stSeqno, COUCHSTORE_DELETES_ONLY);
1083}
1084
1085StorageProperties CouchKVStore::getStorageProperties() {
1086    StorageProperties rv(true, true, true, true);
1087    return rv;
1088}
1089
1090bool CouchKVStore::commit(Callback<kvstats_ctx> *cb, uint64_t snapStartSeqno,
1091                          uint64_t snapEndSeqno) {
1092    cb_assert(!isReadOnly());
1093    if (intransaction) {
1094        if (commit2couchstore(cb, snapStartSeqno, snapEndSeqno)) {
1095            intransaction = false;
1096        }
1097    }
1098
1099    return !intransaction;
1100}
1101
1102uint64_t CouchKVStore::getLastPersistedSeqno(uint16_t vbid) {
1103    vbucket_state *state = cachedVBStates[vbid];
1104    if (state) {
1105        return state->highSeqno;
1106    }
1107    return 0;
1108}
1109
1110void CouchKVStore::addStats(const std::string &prefix,
1111                            ADD_STAT add_stat,
1112                            const void *c) {
1113    const char *prefix_str = prefix.c_str();
1114
1115    /* stats for both read-only and read-write threads */
1116    addStat(prefix_str, "backend_type",   "couchstore",       add_stat, c);
1117    addStat(prefix_str, "open",           st.numOpen,         add_stat, c);
1118    addStat(prefix_str, "close",          st.numClose,        add_stat, c);
1119    addStat(prefix_str, "readTime",       st.readTimeHisto,   add_stat, c);
1120    addStat(prefix_str, "readSize",       st.readSizeHisto,   add_stat, c);
1121    addStat(prefix_str, "numLoadedVb",    st.numLoadedVb,     add_stat, c);
1122
1123    // failure stats
1124    addStat(prefix_str, "failure_open",   st.numOpenFailure, add_stat, c);
1125    addStat(prefix_str, "failure_get",    st.numGetFailure,  add_stat, c);
1126
1127    if (!isReadOnly()) {
1128        addStat(prefix_str, "failure_set",   st.numSetFailure,   add_stat, c);
1129        addStat(prefix_str, "failure_del",   st.numDelFailure,   add_stat, c);
1130        addStat(prefix_str, "failure_vbset", st.numVbSetFailure, add_stat, c);
1131        addStat(prefix_str, "lastCommDocs",  st.docsCommitted,   add_stat, c);
1132    }
1133}
1134
1135void CouchKVStore::addTimingStats(const std::string &prefix,
1136                                  ADD_STAT add_stat, const void *c) {
1137    if (isReadOnly()) {
1138        return;
1139    }
1140    const char *prefix_str = prefix.c_str();
1141    addStat(prefix_str, "commit",      st.commitHisto,      add_stat, c);
1142    addStat(prefix_str, "compact",     st.compactHisto,     add_stat, c);
1143    addStat(prefix_str, "delete",      st.delTimeHisto,     add_stat, c);
1144    addStat(prefix_str, "save_documents", st.saveDocsHisto, add_stat, c);
1145    addStat(prefix_str, "writeTime",   st.writeTimeHisto,   add_stat, c);
1146    addStat(prefix_str, "writeSize",   st.writeSizeHisto,   add_stat, c);
1147    addStat(prefix_str, "bulkSize",    st.batchSize,        add_stat, c);
1148
1149    // Couchstore file ops stats
1150    addStat(prefix_str, "fsReadTime",  st.fsStats.readTimeHisto,  add_stat, c);
1151    addStat(prefix_str, "fsWriteTime", st.fsStats.writeTimeHisto, add_stat, c);
1152    addStat(prefix_str, "fsSyncTime",  st.fsStats.syncTimeHisto,  add_stat, c);
1153    addStat(prefix_str, "fsReadSize",  st.fsStats.readSizeHisto,  add_stat, c);
1154    addStat(prefix_str, "fsWriteSize", st.fsStats.writeSizeHisto, add_stat, c);
1155    addStat(prefix_str, "fsReadSeek",  st.fsStats.readSeekHisto,  add_stat, c);
1156}
1157
1158template <typename T>
1159void CouchKVStore::addStat(const std::string &prefix, const char *stat, T &val,
1160                           ADD_STAT add_stat, const void *c) {
1161    std::stringstream fullstat;
1162    fullstat << prefix << ":" << stat;
1163    add_casted_stat(fullstat.str().c_str(), val, add_stat, c);
1164}
1165
1166void CouchKVStore::optimizeWrites(std::vector<queued_item> &items) {
1167    cb_assert(!isReadOnly());
1168    if (items.empty()) {
1169        return;
1170    }
1171    CompareQueuedItemsBySeqnoAndKey cq;
1172    std::sort(items.begin(), items.end(), cq);
1173}
1174
1175void CouchKVStore::pendingTasks() {
1176
1177    if (!pendingFileDeletions.empty()) {
1178        std::queue<std::string> queue;
1179        pendingFileDeletions.getAll(queue);
1180
1181        while (!queue.empty()) {
1182            std::string filename_str = queue.front();
1183            int errCode;
1184#ifdef _MSC_VER
1185            errCode = _unlink(filename_str.c_str());
1186#else
1187            errCode = unlink(filename_str.c_str());
1188#endif
1189            if (errCode == -1) {
1190                LOG(EXTENSION_LOG_WARNING, "Failed to unlink file '%s' with error "
1191                    "code: %d", filename_str.c_str(), errno);
1192                if (errno != ENOENT) {
1193                    pendingFileDeletions.push(filename_str);
1194                }
1195            }
1196            queue.pop();
1197        }
1198    }
1199}
1200
1201void CouchKVStore::loadDB(shared_ptr<Callback<GetValue> > cb,
1202                          shared_ptr<Callback<CacheLookup> > cl,
1203                          shared_ptr<Callback<SeqnoRange> > sr,
1204                          bool keysOnly, uint16_t vbid,
1205                          uint64_t startSeqno,
1206                          couchstore_docinfos_options options) {
1207    if (!dbFileRevMapPopulated) {
1208        // warmup, first discover db files from local directory
1209        std::vector<std::string> files;
1210        discoverDbFiles(dbname, files);
1211        populateFileNameMap(files, NULL);
1212    }
1213
1214    Db *db = NULL;
1215    uint64_t rev = dbFileRevMap[vbid];
1216    couchstore_error_t errorCode = openDB(vbid, rev, &db,
1217                                          COUCHSTORE_OPEN_FLAG_RDONLY);
1218    if (errorCode != COUCHSTORE_SUCCESS) {
1219        LOG(EXTENSION_LOG_WARNING,
1220            "Failed to open database, name=%s/%d.couch.%lu",
1221            dbname.c_str(), vbid, rev);
1222        remVBucketFromDbFileMap(vbid);
1223    } else {
1224        DbInfo info;
1225        errorCode = couchstore_db_info(db, &info);
1226        if (errorCode != COUCHSTORE_SUCCESS) {
1227            LOG(EXTENSION_LOG_WARNING, "Failed to read DB info for backfill");
1228            closeDatabaseHandle(db);
1229            abort();
1230        }
1231        SeqnoRange range(startSeqno, info.last_sequence);
1232        sr->callback(range);
1233
1234        LoadResponseCtx ctx;
1235        ctx.vbucketId = vbid;
1236        ctx.keysonly = keysOnly;
1237        ctx.callback = cb;
1238        ctx.lookup = cl;
1239        ctx.stats = &epStats;
1240        errorCode = couchstore_changes_since(db, startSeqno, options,
1241                                             recordDbDumpC,
1242                                             static_cast<void *>(&ctx));
1243        if (errorCode != COUCHSTORE_SUCCESS) {
1244            if (errorCode == COUCHSTORE_ERROR_CANCEL) {
1245                LOG(EXTENSION_LOG_WARNING,
1246                    "Canceling loading database, warmup has completed\n");
1247            } else {
1248                LOG(EXTENSION_LOG_WARNING,
1249                    "couchstore_changes_since failed, error=%s [%s]",
1250                    couchstore_strerror(errorCode),
1251                    couchkvstore_strerrno(db, errorCode).c_str());
1252                remVBucketFromDbFileMap(vbid);
1253            }
1254        }
1255        closeDatabaseHandle(db);
1256    }
1257}
1258
1259void CouchKVStore::open() {
1260    // TODO intransaction, is it needed?
1261    intransaction = false;
1262
1263    struct stat dbstat;
1264    bool havedir = false;
1265
1266    if (stat(dbname.c_str(), &dbstat) == 0 &&
1267                            (dbstat.st_mode & S_IFDIR) == S_IFDIR) {
1268        havedir = true;
1269    }
1270
1271    if (!havedir) {
1272        if (mkdir(dbname.c_str(), S_IRWXU) == -1) {
1273            std::stringstream ss;
1274            ss << "Warning: Failed to create data directory ["
1275               << dbname << "]: " << strerror(errno);
1276            throw std::runtime_error(ss.str());
1277        }
1278    }
1279}
1280
1281void CouchKVStore::close() {
1282    intransaction = false;
1283}
1284
1285uint64_t CouchKVStore::checkNewRevNum(std::string &dbFileName, bool newFile) {
1286    uint64_t newrev = 0;
1287    std::string nameKey;
1288
1289    if (!newFile) {
1290        // extract out the file revision number first
1291        size_t secondDot = dbFileName.rfind(".");
1292        nameKey = dbFileName.substr(0, secondDot);
1293    } else {
1294        nameKey = dbFileName;
1295    }
1296    nameKey.append(".");
1297    const std::vector<std::string> files = findFilesWithPrefix(nameKey);
1298    std::vector<std::string>::const_iterator itor;
1299    // found file(s) whoes name has the same key name pair with different
1300    // revision number
1301    for (itor = files.begin(); itor != files.end(); ++itor) {
1302        const std::string &filename = *itor;
1303        if (endWithCompact(filename)) {
1304            continue;
1305        }
1306
1307        size_t secondDot = filename.rfind(".");
1308        char *ptr = NULL;
1309        uint64_t revnum = strtoull(filename.substr(secondDot + 1).c_str(), &ptr, 10);
1310        if (newrev < revnum) {
1311            newrev = revnum;
1312            dbFileName = filename;
1313        }
1314    }
1315    return newrev;
1316}
1317
1318void CouchKVStore::updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev) {
1319    if (vbucketId >= numDbFiles) {
1320        LOG(EXTENSION_LOG_WARNING, NULL,
1321            "Warning: cannot update db file map for an invalid vbucket, "
1322            "vbucket id = %d, rev = %lld\n", vbucketId, newFileRev);
1323        return;
1324    }
1325
1326    dbFileRevMap[vbucketId] = newFileRev;
1327}
1328
1329couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
1330                                        uint64_t fileRev,
1331                                        Db **db,
1332                                        uint64_t options,
1333                                        uint64_t *newFileRev) {
1334    std::string dbFileName = getDBFileName(dbname, vbucketId, fileRev);
1335    couch_file_ops* ops = &statCollectingFileOps;
1336
1337    uint64_t newRevNum = fileRev;
1338    couchstore_error_t errorCode = COUCHSTORE_SUCCESS;
1339
1340    if (options == COUCHSTORE_OPEN_FLAG_CREATE) {
1341        // first try to open the requested file without the create option
1342        // in case it does already exist
1343        errorCode = couchstore_open_db_ex(dbFileName.c_str(), 0, ops, db);
1344        if (errorCode != COUCHSTORE_SUCCESS) {
1345            // open_db failed but still check if the file exists
1346            newRevNum = checkNewRevNum(dbFileName);
1347            bool fileExists = (newRevNum) ? true : false;
1348            if (fileExists) {
1349                errorCode = openDB_retry(dbFileName, 0, ops, db, &newRevNum);
1350            } else {
1351                // requested file doesn't seem to exist, just create one
1352                errorCode = couchstore_open_db_ex(dbFileName.c_str(), options,
1353                                                  ops, db);
1354                if (errorCode == COUCHSTORE_SUCCESS) {
1355                    newRevNum = 1;
1356                    updateDbFileMap(vbucketId, fileRev);
1357                    LOG(EXTENSION_LOG_INFO,
1358                        "INFO: created new couch db file, name=%s rev=%llu",
1359                        dbFileName.c_str(), fileRev);
1360                }
1361            }
1362        }
1363    } else {
1364        errorCode = openDB_retry(dbFileName, options, ops, db, &newRevNum);
1365    }
1366
1367    /* update command statistics */
1368    st.numOpen++;
1369    if (errorCode) {
1370        st.numOpenFailure++;
1371        LOG(EXTENSION_LOG_WARNING, "Warning: couchstore_open_db failed, name=%s"
1372            " option=%X rev=%llu error=%s [%s]\n", dbFileName.c_str(), options,
1373            ((newRevNum > fileRev) ? newRevNum : fileRev),
1374            couchstore_strerror(errorCode),
1375            getSystemStrerror().c_str());
1376    } else {
1377        if (newRevNum > fileRev) {
1378            // new revision number found, update it
1379            updateDbFileMap(vbucketId, newRevNum);
1380        }
1381    }
1382
1383    if (newFileRev != NULL) {
1384        *newFileRev = (newRevNum > fileRev) ? newRevNum : fileRev;
1385    }
1386    return errorCode;
1387}
1388
1389couchstore_error_t CouchKVStore::openDB_retry(std::string &dbfile,
1390                                              uint64_t options,
1391                                              const couch_file_ops *ops,
1392                                              Db** db, uint64_t *newFileRev) {
1393    int retry = 0;
1394    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1395
1396    while (retry < MAX_OPEN_DB_RETRY) {
1397        errCode = couchstore_open_db_ex(dbfile.c_str(), options, ops, db);
1398        if (errCode == COUCHSTORE_SUCCESS) {
1399            return errCode;
1400        }
1401        LOG(EXTENSION_LOG_INFO, "INFO: couchstore_open_db failed, name=%s "
1402            "options=%X error=%s [%s], try it again!",
1403            dbfile.c_str(), options, couchstore_strerror(errCode),
1404            getSystemStrerror().c_str());
1405        *newFileRev = checkNewRevNum(dbfile);
1406        ++retry;
1407        if (retry == MAX_OPEN_DB_RETRY - 1 && options == 0 &&
1408            errCode == COUCHSTORE_ERROR_NO_SUCH_FILE) {
1409            options = COUCHSTORE_OPEN_FLAG_CREATE;
1410        }
1411    }
1412    return errCode;
1413}
1414
1415void CouchKVStore::populateFileNameMap(std::vector<std::string> &filenames,
1416                                       std::vector<uint16_t> *vbids) {
1417    std::vector<std::string>::iterator fileItr;
1418
1419    for (fileItr = filenames.begin(); fileItr != filenames.end(); ++fileItr) {
1420        const std::string &filename = *fileItr;
1421        size_t secondDot = filename.rfind(".");
1422        std::string nameKey = filename.substr(0, secondDot);
1423        size_t firstDot = nameKey.rfind(".");
1424#ifdef _MSC_VER
1425        size_t firstSlash = nameKey.rfind("\\");
1426#else
1427        size_t firstSlash = nameKey.rfind("/");
1428#endif
1429
1430        std::string revNumStr = filename.substr(secondDot + 1);
1431        char *ptr = NULL;
1432        uint64_t revNum = strtoull(revNumStr.c_str(), &ptr, 10);
1433
1434        std::string vbIdStr = nameKey.substr(firstSlash + 1,
1435                                            (firstDot - firstSlash) - 1);
1436        if (allDigit(vbIdStr)) {
1437            int vbId = atoi(vbIdStr.c_str());
1438            if (vbids) {
1439                vbids->push_back(static_cast<uint16_t>(vbId));
1440            }
1441            uint64_t old_rev_num = dbFileRevMap[vbId];
1442            if (old_rev_num == revNum) {
1443                continue;
1444            } else if (old_rev_num < revNum) { // stale revision found
1445                dbFileRevMap[vbId] = revNum;
1446            } else { // stale file found (revision id has rolled over)
1447                old_rev_num = revNum;
1448            }
1449            std::stringstream old_file;
1450            old_file << dbname << "/" << vbId << ".couch." << old_rev_num;
1451            if (access(old_file.str().c_str(), F_OK) == 0) {
1452                if (remove(old_file.str().c_str()) != 0) {
1453                    LOG(EXTENSION_LOG_WARNING,
1454                        "Warning: Failed to remove the stale file '%s': %s",
1455                        old_file.str().c_str(), getSystemStrerror().c_str());
1456                } else {
1457                    LOG(EXTENSION_LOG_WARNING,
1458                        "Warning: Removed stale file '%s'",
1459                        old_file.str().c_str());
1460                }
1461            }
1462        } else {
1463            // skip non-vbucket database file, master.couch etc
1464            LOG(EXTENSION_LOG_DEBUG,
1465                "Non-vbucket database file, %s, skip adding "
1466                "to CouchKVStore dbFileMap\n", filename.c_str());
1467        }
1468    }
1469    dbFileRevMapPopulated = true;
1470}
1471
1472couchstore_error_t CouchKVStore::fetchDoc(Db *db, DocInfo *docinfo,
1473                                          GetValue &docValue, uint16_t vbId,
1474                                          bool metaOnly, bool fetchDelete) {
1475    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1476    sized_buf metadata = docinfo->rev_meta;
1477    uint32_t itemFlags;
1478    uint64_t cas;
1479    time_t exptime;
1480    uint8_t ext_meta[EXT_META_LEN];
1481    uint8_t ext_len;
1482
1483    cb_assert(metadata.size >= DEFAULT_META_LEN);
1484    if (metadata.size == DEFAULT_META_LEN) {
1485        memcpy(&cas, (metadata.buf), 8);
1486        memcpy(&exptime, (metadata.buf) + 8, 4);
1487        memcpy(&itemFlags, (metadata.buf) + 12, 4);
1488        ext_len = 0;
1489    } else {
1490        //metadata.size => 18, FLEX_META_CODE at offset 16
1491        memcpy(&cas, (metadata.buf), 8);
1492        memcpy(&exptime, (metadata.buf) + 8, 4);
1493        memcpy(&itemFlags, (metadata.buf) + 12, 4);
1494        ext_len = metadata.size - DEFAULT_META_LEN - FLEX_DATA_OFFSET;
1495        memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1496               ext_len);
1497    }
1498    cas = ntohll(cas);
1499    exptime = ntohl(exptime);
1500
1501    if (metaOnly || (fetchDelete && docinfo->deleted)) {
1502        Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1503                            docinfo->size, itemFlags, (time_t)exptime,
1504                            ext_meta, ext_len, cas, docinfo->db_seq, vbId);
1505        if (docinfo->deleted) {
1506            it->setDeleted();
1507        }
1508        it->setRevSeqno(docinfo->rev_seq);
1509        docValue = GetValue(it);
1510        // update ep-engine IO stats
1511        ++epStats.io_num_read;
1512        epStats.io_read_bytes.fetch_add(docinfo->id.size);
1513    } else {
1514        Doc *doc = NULL;
1515        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1516                                                   DECOMPRESS_DOC_BODIES);
1517        if (errCode == COUCHSTORE_SUCCESS) {
1518            if (docinfo->deleted) {
1519                // do not read a doc that is marked deleted, just return the
1520                // error code as not found but still release the document body.
1521                errCode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1522            } else {
1523                cb_assert(doc && (doc->id.size <= UINT16_MAX));
1524                size_t valuelen = doc->data.size;
1525                void *valuePtr = doc->data.buf;
1526
1527                /**
1528                 * Set Datatype correctly if data is being
1529                 * read from couch files where datatype is
1530                 * not supported.
1531                 */
1532                if (metadata.size == DEFAULT_META_LEN) {
1533                    ext_len = EXT_META_LEN;
1534                    ext_meta[0] = determine_datatype((const unsigned char*)valuePtr,
1535                                                     valuelen);
1536                }
1537
1538                Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1539                                    itemFlags, (time_t)exptime, valuePtr, valuelen,
1540                                    ext_meta, ext_len, cas, docinfo->db_seq, vbId,
1541                                    docinfo->rev_seq);
1542                docValue = GetValue(it);
1543
1544                // update ep-engine IO stats
1545                ++epStats.io_num_read;
1546                epStats.io_read_bytes.fetch_add(docinfo->id.size + valuelen);
1547            }
1548            couchstore_free_document(doc);
1549        }
1550    }
1551    return errCode;
1552}
1553
1554int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx) {
1555
1556    LoadResponseCtx *loadCtx = (LoadResponseCtx *)ctx;
1557    shared_ptr<Callback<GetValue> > cb = loadCtx->callback;
1558    shared_ptr<Callback<CacheLookup> > cl = loadCtx->lookup;
1559
1560    Doc *doc = NULL;
1561    void *valuePtr = NULL;
1562    size_t valuelen = 0;
1563    uint64_t byseqno = docinfo->db_seq;
1564    sized_buf  metadata = docinfo->rev_meta;
1565    uint16_t vbucketId = loadCtx->vbucketId;
1566    sized_buf key = docinfo->id;
1567    uint32_t itemflags;
1568    uint64_t cas;
1569    uint32_t exptime;
1570    uint8_t ext_meta[EXT_META_LEN];
1571    uint8_t ext_len;
1572
1573    cb_assert(key.size <= UINT16_MAX);
1574    cb_assert(metadata.size >= DEFAULT_META_LEN);
1575
1576    std::string docKey(docinfo->id.buf, docinfo->id.size);
1577    CacheLookup lookup(docKey, byseqno, vbucketId);
1578    cl->callback(lookup);
1579    if (cl->getStatus() == ENGINE_KEY_EEXISTS) {
1580        return COUCHSTORE_SUCCESS;
1581    }
1582
1583    if (metadata.size == DEFAULT_META_LEN) {
1584        memcpy(&cas, (metadata.buf), 8);
1585        memcpy(&exptime, (metadata.buf) + 8, 4);
1586        memcpy(&itemflags, (metadata.buf) + 12, 4);
1587        ext_len = 0;
1588    } else {
1589        //metadata.size > 16, FLEX_META_CODE at offset 16
1590        memcpy(&cas, (metadata.buf), 8);
1591        memcpy(&exptime, (metadata.buf) + 8, 4);
1592        memcpy(&itemflags, (metadata.buf) + 12, 4);
1593        ext_len = metadata.size - DEFAULT_META_LEN - FLEX_DATA_OFFSET;
1594        memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1595               ext_len);
1596    }
1597    exptime = ntohl(exptime);
1598    cas = ntohll(cas);
1599
1600    if (!loadCtx->keysonly && !docinfo->deleted) {
1601        couchstore_error_t errCode ;
1602        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1603                                                   DECOMPRESS_DOC_BODIES);
1604
1605        if (errCode == COUCHSTORE_SUCCESS) {
1606            if (doc->data.size) {
1607                valuelen = doc->data.size;
1608                valuePtr = doc->data.buf;
1609
1610                /**
1611                 * Set Datatype correctly if data is being
1612                 * read from couch files where datatype is
1613                 * not supported.
1614                 */
1615                if (metadata.size == DEFAULT_META_LEN) {
1616                    ext_len = EXT_META_LEN;
1617                    ext_meta[0] = determine_datatype((const unsigned char*)valuePtr,
1618                                                     valuelen);
1619                }
1620            }
1621        } else {
1622            LOG(EXTENSION_LOG_WARNING,
1623                "Warning: failed to retrieve key value from database "
1624                "database, vBucket=%d key=%s error=%s [%s]\n",
1625                vbucketId, key.buf, couchstore_strerror(errCode),
1626                couchkvstore_strerrno(db, errCode).c_str());
1627            return COUCHSTORE_SUCCESS;
1628        }
1629    }
1630
1631    Item *it = new Item((void *)key.buf,
1632                        key.size,
1633                        itemflags,
1634                        (time_t)exptime,
1635                        valuePtr, valuelen,
1636                        ext_meta, ext_len,
1637                        cas,
1638                        docinfo->db_seq, // return seq number being persisted on disk
1639                        vbucketId,
1640                        docinfo->rev_seq);
1641    if (docinfo->deleted) {
1642        it->setDeleted();
1643    }
1644
1645
1646    GetValue rv(it, ENGINE_SUCCESS, -1, loadCtx->keysonly);
1647    cb->callback(rv);
1648
1649    couchstore_free_document(doc);
1650
1651    if (cb->getStatus() == ENGINE_ENOMEM) {
1652        return COUCHSTORE_ERROR_CANCEL;
1653    }
1654    return COUCHSTORE_SUCCESS;
1655}
1656
1657bool CouchKVStore::commit2couchstore(Callback<kvstats_ctx> *cb,
1658                                     uint64_t snapStartSeqno,
1659                                     uint64_t snapEndSeqno) {
1660    bool success = true;
1661
1662    size_t pendingCommitCnt = pendingReqsQ.size();
1663    if (pendingCommitCnt == 0) {
1664        return success;
1665    }
1666
1667    Doc **docs = new Doc *[pendingCommitCnt];
1668    DocInfo **docinfos = new DocInfo *[pendingCommitCnt];
1669
1670    cb_assert(pendingReqsQ[0]);
1671    uint16_t vbucket2flush = pendingReqsQ[0]->getVBucketId();
1672    uint64_t fileRev = pendingReqsQ[0]->getRevNum();
1673    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1674        CouchRequest *req = pendingReqsQ[i];
1675        cb_assert(req);
1676        docs[i] = req->getDbDoc();
1677        docinfos[i] = req->getDbDocInfo();
1678        cb_assert(vbucket2flush == req->getVBucketId());
1679    }
1680
1681    kvstats_ctx kvctx;
1682    kvctx.vbucket = vbucket2flush;
1683    // flush all
1684    couchstore_error_t errCode = saveDocs(vbucket2flush, fileRev, docs,
1685                                          docinfos, pendingCommitCnt, kvctx,
1686                                          snapStartSeqno, snapEndSeqno);
1687    if (errCode) {
1688        LOG(EXTENSION_LOG_WARNING,
1689            "Warning: commit failed, cannot save CouchDB docs "
1690            "for vbucket = %d rev = %llu\n", vbucket2flush, fileRev);
1691        ++epStats.commitFailed;
1692    }
1693    if (cb) {
1694        cb->callback(kvctx);
1695    }
1696    commitCallback(pendingReqsQ, kvctx, errCode);
1697
1698    // clean up
1699    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1700        delete pendingReqsQ[i];
1701    }
1702    pendingReqsQ.clear();
1703    delete [] docs;
1704    delete [] docinfos;
1705    return success;
1706}
1707
1708static int readDocInfos(Db *db, DocInfo *docinfo, void *ctx) {
1709    cb_assert(ctx);
1710    kvstats_ctx *cbCtx = static_cast<kvstats_ctx *>(ctx);
1711    if(docinfo) {
1712        // An item exists in the VB DB file.
1713        if (!docinfo->deleted) {
1714            std::string key(docinfo->id.buf, docinfo->id.size);
1715            unordered_map<std::string, kstat_entry_t>::iterator itr =
1716                cbCtx->keyStats.find(key);
1717            if (itr != cbCtx->keyStats.end()) {
1718                itr->second.first = true;
1719            }
1720        }
1721    }
1722    return 0;
1723}
1724
1725couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev,
1726                                          Doc **docs, DocInfo **docinfos,
1727                                          size_t docCount, kvstats_ctx &kvctx,
1728                                          uint64_t snapStartSeqno,
1729                                          uint64_t snapEndSeqno) {
1730    couchstore_error_t errCode;
1731    uint64_t fileRev = rev;
1732    DbInfo info;
1733    cb_assert(fileRev);
1734
1735    Db *db = NULL;
1736    uint64_t newFileRev;
1737    errCode = openDB(vbid, fileRev, &db, 0, &newFileRev);
1738    if (errCode != COUCHSTORE_SUCCESS) {
1739        LOG(EXTENSION_LOG_WARNING,
1740                "Warning: failed to open database, vbucketId = %d "
1741                "fileRev = %llu numDocs = %d", vbid, fileRev, docCount);
1742        return errCode;
1743    } else {
1744        uint64_t max = computeMaxDeletedSeqNum(docinfos, docCount);
1745
1746        vbucket_state *state = cachedVBStates[vbid];
1747        cb_assert(state);
1748
1749        // update max_deleted_seq in the local doc (vbstate)
1750        // before save docs for the given vBucket
1751        if (max > 0 && state->maxDeletedSeqno < max) {
1752            state->maxDeletedSeqno = max;
1753        }
1754
1755        uint64_t maxDBSeqno = 0;
1756        sized_buf *ids = new sized_buf[docCount];
1757        for (size_t idx = 0; idx < docCount; idx++) {
1758            ids[idx] = docinfos[idx]->id;
1759            maxDBSeqno = std::max(maxDBSeqno, docinfos[idx]->db_seq);
1760            std::string key(ids[idx].buf, ids[idx].size);
1761            kvctx.keyStats[key] = std::make_pair(false,
1762                    !docinfos[idx]->deleted);
1763        }
1764        couchstore_docinfos_by_id(db, ids, (unsigned) docCount, 0,
1765                readDocInfos, &kvctx);
1766        delete[] ids;
1767
1768        hrtime_t cs_begin = gethrtime();
1769        uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
1770        errCode = couchstore_save_documents(db, docs, docinfos,
1771                (unsigned) docCount, flags);
1772        st.saveDocsHisto.add((gethrtime() - cs_begin) / 1000);
1773        if (errCode != COUCHSTORE_SUCCESS) {
1774            LOG(EXTENSION_LOG_WARNING,
1775                    "Warning: failed to save docs to database, numDocs = %d "
1776                    "error=%s [%s]\n", docCount, couchstore_strerror(errCode),
1777                    couchkvstore_strerrno(db, errCode).c_str());
1778            closeDatabaseHandle(db);
1779            return errCode;
1780        }
1781
1782        state->lastSnapStart = snapStartSeqno;
1783        state->lastSnapEnd = snapEndSeqno;
1784        errCode = saveVBState(db, *state);
1785        if (errCode != COUCHSTORE_SUCCESS) {
1786            LOG(EXTENSION_LOG_WARNING, "Warning: failed to save local docs to "
1787                "database, error=%s [%s]", couchstore_strerror(errCode),
1788                couchkvstore_strerrno(db, errCode).c_str());
1789                closeDatabaseHandle(db);
1790                return errCode;
1791        }
1792
1793        cs_begin = gethrtime();
1794        errCode = couchstore_commit(db);
1795        st.commitHisto.add((gethrtime() - cs_begin) / 1000);
1796        if (errCode) {
1797            LOG(EXTENSION_LOG_WARNING,
1798                    "Warning: couchstore_commit failed, error=%s [%s]",
1799                    couchstore_strerror(errCode),
1800                    couchkvstore_strerrno(db, errCode).c_str());
1801            closeDatabaseHandle(db);
1802            return errCode;
1803        }
1804
1805        st.batchSize.add(docCount);
1806
1807        // retrieve storage system stats for file fragmentation computation
1808        couchstore_db_info(db, &info);
1809        kvctx.fileSpaceUsed = info.space_used;
1810        kvctx.fileSize = info.file_size;
1811        cachedDeleteCount[vbid] = info.deleted_count;
1812        cachedDocCount[vbid] = info.doc_count;
1813
1814        if (maxDBSeqno != info.last_sequence) {
1815            LOG(EXTENSION_LOG_WARNING, "Seqno in db header (%llu) is not matched with "
1816                "what was persisted (%llu) for vbucket %d", info.last_sequence,
1817                maxDBSeqno, vbid);
1818        }
1819        state->highSeqno = info.last_sequence;
1820
1821        closeDatabaseHandle(db);
1822    }
1823
1824    /* update stat */
1825    if(errCode == COUCHSTORE_SUCCESS) {
1826        st.docsCommitted = docCount;
1827    }
1828
1829    return errCode;
1830}
1831
1832void CouchKVStore::remVBucketFromDbFileMap(uint16_t vbucketId) {
1833    if (vbucketId >= numDbFiles) {
1834        LOG(EXTENSION_LOG_WARNING, NULL,
1835            "Warning: cannot remove db file map entry for an invalid vbucket, "
1836            "vbucket id = %d\n", vbucketId);
1837        return;
1838    }
1839
1840    // just reset revision number of the requested vbucket
1841    dbFileRevMap[vbucketId] = 1;
1842}
1843
1844void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
1845                                  kvstats_ctx &kvctx,
1846                                  couchstore_error_t errCode) {
1847    size_t commitSize = committedReqs.size();
1848
1849    for (size_t index = 0; index < commitSize; index++) {
1850        size_t dataSize = committedReqs[index]->getNBytes();
1851        size_t keySize = committedReqs[index]->getKey().length();
1852        /* update ep stats */
1853        ++epStats.io_num_write;
1854        epStats.io_write_bytes.fetch_add(keySize + dataSize);
1855
1856        if (committedReqs[index]->isDelete()) {
1857            int rv = getMutationStatus(errCode);
1858            if (rv != -1) {
1859                const std::string &key = committedReqs[index]->getKey();
1860                if (kvctx.keyStats[key].first) {
1861                    rv = 1; // Deletion is for an existing item on DB file.
1862                } else {
1863                    rv = 0; // Deletion is for a non-existing item on DB file.
1864                }
1865            }
1866            if (errCode) {
1867                ++st.numDelFailure;
1868            } else {
1869                st.delTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1870            }
1871            committedReqs[index]->getDelCallback()->callback(rv);
1872        } else {
1873            int rv = getMutationStatus(errCode);
1874            const std::string &key = committedReqs[index]->getKey();
1875            bool insertion = !kvctx.keyStats[key].first;
1876            if (errCode) {
1877                ++st.numSetFailure;
1878            } else {
1879                st.writeTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1880                st.writeSizeHisto.add(dataSize + keySize);
1881            }
1882            mutation_result p(rv, insertion);
1883            committedReqs[index]->getSetCallback()->callback(p);
1884        }
1885    }
1886}
1887
1888void CouchKVStore::readVBState(Db *db, uint16_t vbId) {
1889    sized_buf id;
1890    LocalDoc *ldoc = NULL;
1891    couchstore_error_t errCode;
1892
1893    vbucket_state_t state = vbucket_state_dead;
1894    uint64_t checkpointId = 0;
1895    uint64_t maxDeletedSeqno = 0;
1896    int64_t highSeqno = 0;
1897    std::string failovers("[{\"id\":0,\"seq\":0}]");
1898    uint64_t purgeSeqno = 0;
1899    uint64_t lastSnapStart = 0;
1900    uint64_t lastSnapEnd = 0;
1901
1902    DbInfo info;
1903    errCode = couchstore_db_info(db, &info);
1904    if (errCode == COUCHSTORE_SUCCESS) {
1905        highSeqno = info.last_sequence;
1906        purgeSeqno = info.purge_seq;
1907    } else {
1908        LOG(EXTENSION_LOG_WARNING,
1909            "Warning: failed to read database info for vBucket = %d", vbId);
1910        abort();
1911    }
1912
1913    id.buf = (char *)"_local/vbstate";
1914    id.size = sizeof("_local/vbstate") - 1;
1915    errCode = couchstore_open_local_document(db, (void *)id.buf,
1916                                             id.size, &ldoc);
1917    if (errCode != COUCHSTORE_SUCCESS) {
1918        LOG(EXTENSION_LOG_DEBUG,
1919            "Warning: failed to retrieve stat info for vBucket=%d error=%s [%s]",
1920            vbId, couchstore_strerror(errCode),
1921            couchkvstore_strerrno(db, errCode).c_str());
1922    } else {
1923        const std::string statjson(ldoc->json.buf, ldoc->json.size);
1924        cJSON *jsonObj = cJSON_Parse(statjson.c_str());
1925        if (!jsonObj) {
1926            LOG(EXTENSION_LOG_WARNING,
1927                "Warning: failed to parse the vbstat json doc for vbucket %d: %s",
1928                vbId, statjson.c_str());
1929            couchstore_free_local_document(ldoc);
1930            abort();
1931        }
1932
1933        const std::string vb_state = getJSONObjString(
1934                                cJSON_GetObjectItem(jsonObj, "state"));
1935        const std::string checkpoint_id = getJSONObjString(
1936                                cJSON_GetObjectItem(jsonObj,"checkpoint_id"));
1937        const std::string max_deleted_seqno = getJSONObjString(
1938                                cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
1939        const std::string snapStart = getJSONObjString(
1940                                cJSON_GetObjectItem(jsonObj, "snap_start"));
1941        const std::string snapEnd = getJSONObjString(
1942                                cJSON_GetObjectItem(jsonObj, "snap_end"));
1943        cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
1944        if (vb_state.compare("") == 0 || checkpoint_id.compare("") == 0
1945                || max_deleted_seqno.compare("") == 0) {
1946            LOG(EXTENSION_LOG_WARNING,
1947                "Warning: state JSON doc for vbucket %d is in the wrong format: %s",
1948                vbId, statjson.c_str());
1949        } else {
1950            state = VBucket::fromString(vb_state.c_str());
1951            parseUint64(max_deleted_seqno.c_str(), &maxDeletedSeqno);
1952            parseUint64(checkpoint_id.c_str(), &checkpointId);
1953
1954            if (snapStart.compare("") == 0) {
1955                lastSnapStart = highSeqno;
1956            } else {
1957                parseUint64(snapStart.c_str(), &lastSnapStart);
1958            }
1959
1960            if (snapEnd.compare("") == 0) {
1961                lastSnapEnd = highSeqno;
1962            } else {
1963                parseUint64(snapEnd.c_str(), &lastSnapEnd);
1964            }
1965
1966            if (failover_json) {
1967                char* json = cJSON_PrintUnformatted(failover_json);
1968                failovers.assign(json);
1969                free(json);
1970            }
1971        }
1972        cJSON_Delete(jsonObj);
1973        couchstore_free_local_document(ldoc);
1974    }
1975
1976    delete cachedVBStates[vbId];
1977    cachedVBStates[vbId] = new vbucket_state(state, checkpointId,
1978                                               maxDeletedSeqno, highSeqno,
1979                                               purgeSeqno, lastSnapStart,
1980                                               lastSnapEnd, failovers);
1981}
1982
1983couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState) {
1984    std::stringstream jsonState;
1985
1986    jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
1987              << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
1988              << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno << "\""
1989              << ",\"failover_table\": " << vbState.failovers
1990              << ",\"snap_start\": \"" << vbState.lastSnapStart << "\""
1991              << ",\"snap_end\": \"" << vbState.lastSnapEnd << "\""
1992              << "}";
1993
1994    LocalDoc lDoc;
1995    lDoc.id.buf = (char *)"_local/vbstate";
1996    lDoc.id.size = sizeof("_local/vbstate") - 1;
1997    std::string state = jsonState.str();
1998    lDoc.json.buf = (char *)state.c_str();
1999    lDoc.json.size = state.size();
2000    lDoc.deleted = 0;
2001
2002    couchstore_error_t errCode = couchstore_save_local_document(db, &lDoc);
2003    if (errCode != COUCHSTORE_SUCCESS) {
2004        LOG(EXTENSION_LOG_WARNING,
2005            "Warning: couchstore_save_local_document failed "
2006            "error=%s [%s]\n", couchstore_strerror(errCode),
2007            couchkvstore_strerrno(db, errCode).c_str());
2008    }
2009    return errCode;
2010}
2011
2012int CouchKVStore::getMultiCb(Db *db, DocInfo *docinfo, void *ctx) {
2013    cb_assert(docinfo);
2014    std::string keyStr(docinfo->id.buf, docinfo->id.size);
2015    cb_assert(ctx);
2016    GetMultiCbCtx *cbCtx = static_cast<GetMultiCbCtx *>(ctx);
2017    CouchKVStoreStats &st = cbCtx->cks.getCKVStoreStat();
2018
2019
2020    vb_bgfetch_queue_t::iterator qitr = cbCtx->fetches.find(keyStr);
2021    if (qitr == cbCtx->fetches.end()) {
2022        // this could be a serious race condition in couchstore,
2023        // log a warning message and continue
2024        LOG(EXTENSION_LOG_WARNING,
2025            "Warning: couchstore returned invalid docinfo, "
2026            "no pending bgfetch has been issued for key = %s\n",
2027            keyStr.c_str());
2028        return 0;
2029    }
2030
2031    bool meta_only = true;
2032    std::list<VBucketBGFetchItem *> &fetches = (*qitr).second;
2033    std::list<VBucketBGFetchItem *>::iterator itr = fetches.begin();
2034    for (; itr != fetches.end(); ++itr) {
2035        if (!((*itr)->metaDataOnly)) {
2036            meta_only = false;
2037            break;
2038        }
2039    }
2040
2041    GetValue returnVal;
2042    couchstore_error_t errCode = cbCtx->cks.fetchDoc(db, docinfo, returnVal,
2043                                                     cbCtx->vbId, meta_only);
2044    if (errCode != COUCHSTORE_SUCCESS && !meta_only) {
2045        LOG(EXTENSION_LOG_WARNING, "Warning: failed to fetch data from database, "
2046            "vBucket=%d key=%s error=%s [%s]", cbCtx->vbId,
2047            keyStr.c_str(), couchstore_strerror(errCode),
2048            couchkvstore_strerrno(db, errCode).c_str());
2049        st.numGetFailure++;
2050    }
2051
2052    returnVal.setStatus(cbCtx->cks.couchErr2EngineErr(errCode));
2053    for (itr = fetches.begin(); itr != fetches.end(); ++itr) {
2054        // populate return value for remaining fetch items with the
2055        // same seqid
2056        (*itr)->value = returnVal;
2057        st.readTimeHisto.add((gethrtime() - (*itr)->initTime) / 1000);
2058        if (errCode == COUCHSTORE_SUCCESS) {
2059            st.readSizeHisto.add(returnVal.getValue()->getNKey() +
2060                                 returnVal.getValue()->getNBytes());
2061        }
2062    }
2063    return 0;
2064}
2065
2066
2067void CouchKVStore::closeDatabaseHandle(Db *db) {
2068    couchstore_error_t ret = couchstore_close_db(db);
2069    if (ret != COUCHSTORE_SUCCESS) {
2070        LOG(EXTENSION_LOG_WARNING,
2071            "Warning: couchstore_close_db failed, error=%s [%s]",
2072            couchstore_strerror(ret), couchkvstore_strerrno(NULL, ret).c_str());
2073    }
2074    st.numClose++;
2075}
2076
2077ENGINE_ERROR_CODE CouchKVStore::couchErr2EngineErr(couchstore_error_t errCode) {
2078    switch (errCode) {
2079    case COUCHSTORE_SUCCESS:
2080        return ENGINE_SUCCESS;
2081    case COUCHSTORE_ERROR_ALLOC_FAIL:
2082        return ENGINE_ENOMEM;
2083    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
2084        return ENGINE_KEY_ENOENT;
2085    case COUCHSTORE_ERROR_NO_SUCH_FILE:
2086    case COUCHSTORE_ERROR_NO_HEADER:
2087    default:
2088        // same as the general error return code of
2089        // EvetuallyPersistentStore::getInternal
2090        return ENGINE_TMPFAIL;
2091    }
2092}
2093
2094size_t CouchKVStore::getEstimatedItemCount(std::vector<uint16_t> &vbs) {
2095    size_t items = 0;
2096    std::vector<uint16_t>::iterator it;
2097    for (it = vbs.begin(); it != vbs.end(); ++it) {
2098        items += getNumItems(*it);
2099    }
2100    return items;
2101}
2102
2103size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
2104    size_t delCount = cachedDeleteCount[vbid];
2105    if (delCount != (size_t) -1) {
2106        return delCount;
2107    }
2108
2109    if (!dbFileRevMapPopulated) {
2110        std::vector<std::string> files;
2111        // first scan all db files from data directory
2112        discoverDbFiles(dbname, files);
2113        populateFileNameMap(files, NULL);
2114    }
2115
2116    Db *db = NULL;
2117    uint64_t rev = dbFileRevMap[vbid];
2118    couchstore_error_t errCode = openDB(vbid, rev, &db,
2119                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2120    if (errCode == COUCHSTORE_SUCCESS) {
2121        DbInfo info;
2122        errCode = couchstore_db_info(db, &info);
2123        if (errCode == COUCHSTORE_SUCCESS) {
2124            cachedDeleteCount[vbid] = info.deleted_count;
2125            closeDatabaseHandle(db);
2126            return info.deleted_count;
2127        } else {
2128            LOG(EXTENSION_LOG_WARNING,
2129                "Warning: failed to read database info for "
2130                "vBucket = %d rev = %llu\n", vbid, rev);
2131        }
2132        closeDatabaseHandle(db);
2133    } else {
2134        LOG(EXTENSION_LOG_WARNING,
2135            "Warning: failed to open database file for "
2136            "vBucket = %d rev = %llu\n", vbid, rev);
2137    }
2138    return 0;
2139
2140}
2141
2142size_t CouchKVStore::getNumItems(uint16_t vbid) {
2143    size_t docCount = cachedDocCount[vbid];
2144    if ( docCount != (size_t) -1) {
2145        return docCount;
2146    }
2147
2148    if (!dbFileRevMapPopulated) {
2149        std::vector<std::string> files;
2150        // first scan all db files from data directory
2151        discoverDbFiles(dbname, files);
2152        populateFileNameMap(files, NULL);
2153    }
2154
2155    Db *db = NULL;
2156    uint64_t rev = dbFileRevMap[vbid];
2157    couchstore_error_t errCode = openDB(vbid, rev, &db,
2158                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2159    if (errCode == COUCHSTORE_SUCCESS) {
2160        DbInfo info;
2161        errCode = couchstore_db_info(db, &info);
2162        if (errCode == COUCHSTORE_SUCCESS) {
2163            cachedDocCount[vbid] = info.doc_count;
2164            closeDatabaseHandle(db);
2165            return info.doc_count;
2166        } else {
2167            LOG(EXTENSION_LOG_WARNING,
2168                "Warning: failed to read database info for "
2169                "vBucket = %d rev = %llu\n", vbid, rev);
2170        }
2171        closeDatabaseHandle(db);
2172    } else {
2173        LOG(EXTENSION_LOG_WARNING,
2174            "Warning: failed to open database file for "
2175            "vBucket = %d rev = %llu\n", vbid, rev);
2176    }
2177    return 0;
2178}
2179
2180size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
2181                                 uint64_t max_seq) {
2182    if (!dbFileRevMapPopulated) {
2183        std::vector<std::string> files;
2184        discoverDbFiles(dbname, files);
2185        populateFileNameMap(files, NULL);
2186    }
2187
2188    Db *db = NULL;
2189    uint64_t count = 0;
2190    uint64_t rev = dbFileRevMap[vbid];
2191    couchstore_error_t errCode = openDB(vbid, rev, &db,
2192                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2193    if (errCode == COUCHSTORE_SUCCESS) {
2194        errCode = couchstore_changes_count(db, min_seq, max_seq, &count);
2195        if (errCode != COUCHSTORE_SUCCESS) {
2196            LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2197                "vBucket = %d rev = %llu", vbid, rev);
2198        }
2199        closeDatabaseHandle(db);
2200    } else {
2201        LOG(EXTENSION_LOG_WARNING, "Failed to open database file for vBucket"
2202            " = %d rev = %llu", vbid, rev);
2203    }
2204    return count;
2205}
2206
2207RollbackResult CouchKVStore::rollback(uint16_t vbid, uint64_t rollbackSeqno,
2208                                      shared_ptr<RollbackCB> cb) {
2209
2210    Db *db = NULL;
2211    DbInfo info;
2212    uint64_t fileRev = dbFileRevMap[vbid];
2213    std::stringstream dbFileName;
2214    dbFileName << dbname << "/" << vbid << ".couch." << fileRev;
2215    couchstore_error_t errCode;
2216
2217    errCode = openDB(vbid, fileRev, &db,
2218                     (uint64_t) COUCHSTORE_OPEN_FLAG_RDONLY);
2219
2220    if (errCode == COUCHSTORE_SUCCESS) {
2221        errCode = couchstore_db_info(db, &info);
2222        if (errCode != COUCHSTORE_SUCCESS) {
2223            LOG(EXTENSION_LOG_WARNING,
2224                "Failed to read DB info, name=%s",
2225                dbFileName.str().c_str());
2226            closeDatabaseHandle(db);
2227            return RollbackResult(false, 0, 0, 0);
2228        }
2229    } else {
2230        LOG(EXTENSION_LOG_WARNING,
2231                "Failed to open database, name=%s",
2232                dbFileName.str().c_str());
2233        return RollbackResult(false, 0, 0, 0);
2234    }
2235
2236    uint64_t latestSeqno = info.last_sequence;
2237
2238    //Count from latest seq no to 0
2239    uint64_t totSeqCount = 0;
2240    errCode = couchstore_changes_count(db, 0, latestSeqno, &totSeqCount);
2241    if (errCode != COUCHSTORE_SUCCESS) {
2242        LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2243            "rollback vBucket = %d, rev = %llu", vbid, fileRev);
2244        closeDatabaseHandle(db);
2245        return RollbackResult(false, 0, 0, 0);
2246    }
2247
2248    Db *newdb = NULL;
2249    errCode = openDB(vbid, fileRev, &newdb, 0);
2250    if (errCode != COUCHSTORE_SUCCESS) {
2251        LOG(EXTENSION_LOG_WARNING,
2252                "Failed to open database, name=%s",
2253                dbFileName.str().c_str());
2254        closeDatabaseHandle(db);
2255        return RollbackResult(false, 0, 0, 0);
2256    }
2257
2258    while (info.last_sequence > rollbackSeqno) {
2259        errCode = couchstore_rewind_db_header(newdb);
2260        if (errCode != COUCHSTORE_SUCCESS) {
2261            LOG(EXTENSION_LOG_WARNING,
2262                    "Failed to rewind Db pointer "
2263                    "for couch file with vbid: %u, whose "
2264                    "lastSeqno: %llu, while trying to roll back "
2265                    "to seqNo: %llu", vbid, latestSeqno, rollbackSeqno);
2266            //Reset the vbucket and send the entire snapshot,
2267            //as a previous header wasn't found.
2268            closeDatabaseHandle(db);
2269            return RollbackResult(false, 0, 0, 0);
2270        }
2271        errCode = couchstore_db_info(newdb, &info);
2272        if (errCode != COUCHSTORE_SUCCESS) {
2273            LOG(EXTENSION_LOG_WARNING,
2274                "Failed to read DB info, name=%s",
2275                dbFileName.str().c_str());
2276            closeDatabaseHandle(db);
2277            closeDatabaseHandle(newdb);
2278            return RollbackResult(false, 0, 0, 0);
2279        }
2280    }
2281
2282    //Count from latest seq no to rollback seq no
2283    uint64_t rollbackSeqCount = 0;
2284    errCode = couchstore_changes_count(db, info.last_sequence, latestSeqno,
2285                                       &rollbackSeqCount);
2286    if (errCode != COUCHSTORE_SUCCESS) {
2287        LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2288            "rollback vBucket = %d, rev = %llu", vbid, fileRev);
2289        closeDatabaseHandle(db);
2290        closeDatabaseHandle(newdb);
2291        return RollbackResult(false, 0, 0, 0);
2292    }
2293
2294    if ((totSeqCount / 2) <= rollbackSeqCount) {
2295        //doresetVbucket flag set or rollback is greater than 50%,
2296        //reset the vbucket and send the entire snapshot
2297        closeDatabaseHandle(db);
2298        closeDatabaseHandle(newdb);
2299        return RollbackResult(false, 0, 0, 0);
2300    }
2301
2302    cb->setDbHeader(newdb);
2303    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
2304    LoadResponseCtx ctx;
2305    ctx.vbucketId = vbid;
2306    ctx.keysonly = true;
2307    ctx.lookup = cl;
2308    ctx.callback = cb;
2309    ctx.stats = &epStats;
2310    errCode = couchstore_changes_since(db, info.last_sequence + 1,
2311                                       COUCHSTORE_NO_OPTIONS,
2312                                       recordDbDumpC,
2313                                       static_cast<void *>(&ctx));
2314    if (errCode != COUCHSTORE_SUCCESS) {
2315        if (errCode == COUCHSTORE_ERROR_CANCEL) {
2316            LOG(EXTENSION_LOG_WARNING,
2317                "Canceling loading database\n");
2318        } else {
2319            LOG(EXTENSION_LOG_WARNING,
2320                "Couchstore_changes_since failed, error=%s [%s]",
2321                couchstore_strerror(errCode),
2322                couchkvstore_strerrno(db, errCode).c_str());
2323        }
2324        closeDatabaseHandle(db);
2325        closeDatabaseHandle(newdb);
2326        return RollbackResult(false, 0, 0, 0);
2327    }
2328
2329    readVBState(newdb, vbid);
2330    cachedDeleteCount[vbid] = info.deleted_count;
2331    cachedDocCount[vbid] = info.doc_count;
2332
2333    closeDatabaseHandle(db);
2334    //Append the rewinded header to the database file, before closing handle
2335    errCode = couchstore_commit(newdb);
2336    closeDatabaseHandle(newdb);
2337
2338    if (errCode != COUCHSTORE_SUCCESS) {
2339        return RollbackResult(false, 0, 0, 0);
2340    }
2341
2342    vbucket_state *vb_state = cachedVBStates[vbid];
2343    return RollbackResult(true, vb_state->highSeqno,
2344                          vb_state->lastSnapStart, vb_state->lastSnapEnd);
2345}
2346
2347int populateAllKeys(Db *db, DocInfo *docinfo, void *ctx) {
2348    AllKeysCtx *allKeysCtx = (AllKeysCtx *)ctx;
2349    uint16_t keylen = docinfo->id.size;
2350    char *key = docinfo->id.buf;
2351    (allKeysCtx->cb)->addtoAllKeys(keylen, key);
2352    if (--(allKeysCtx->count) <= 0) {
2353        //Only when count met is less than the actual number of entries
2354        return COUCHSTORE_ERROR_CANCEL;
2355    }
2356    return COUCHSTORE_SUCCESS;
2357}
2358
2359ENGINE_ERROR_CODE CouchKVStore::getAllKeys(uint16_t vbid,
2360                                           std::string &start_key,
2361                                           uint32_t count,
2362                                           AllKeysCB *cb) {
2363    Db *db = NULL;
2364    uint64_t rev = dbFileRevMap[vbid];
2365    couchstore_error_t errCode = openDB(vbid, rev, &db,
2366                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2367    if(errCode == COUCHSTORE_SUCCESS) {
2368        sized_buf ref = {NULL, 0};
2369        ref.buf = (char*) start_key.c_str();
2370        ref.size = start_key.size();
2371        AllKeysCtx ctx(cb, count);
2372        errCode = couchstore_all_docs(db, &ref, COUCHSTORE_NO_OPTIONS,
2373                                      populateAllKeys,
2374                                      static_cast<void *>(&ctx));
2375        closeDatabaseHandle(db);
2376        if (errCode == COUCHSTORE_SUCCESS ||
2377                errCode == COUCHSTORE_ERROR_CANCEL)  {
2378            return ENGINE_SUCCESS;
2379        } else {
2380            LOG(EXTENSION_LOG_WARNING, "couchstore_all_docs failed for "
2381                    "database file of vbucket = %d rev = %llu, errCode = %u\n",
2382                    vbid, rev, errCode);
2383        }
2384    } else {
2385        LOG(EXTENSION_LOG_WARNING, "Failed to open database file for "
2386                "vbucket = %d rev = %llu, errCode = %u\n", vbid, rev, errCode);
2387
2388    }
2389    return ENGINE_FAILED;
2390}
2391
2392void CouchKVStore::unlinkCouchFile(uint16_t vbucket,
2393                                   uint64_t fRev) {
2394
2395    int errCode;
2396    char fname[PATH_MAX];
2397    snprintf(fname, sizeof(fname), "%s/%d.couch.%llu",
2398             dbname.c_str(), vbucket, fRev);
2399#ifdef _MSC_VER
2400    errCode = _unlink(fname);
2401#else
2402    errCode = unlink(fname);
2403#endif
2404    if (errCode == -1) {
2405        LOG(EXTENSION_LOG_WARNING, "Failed to unlink database file for "
2406            "vbucket = %d rev = %llu, errCode = %u\n", vbucket, fRev, errno);
2407
2408        if (errno != ENOENT) {
2409            std::string file_str = fname;
2410            pendingFileDeletions.push(file_str);
2411        }
2412    }
2413}
2414
2415void CouchKVStore::removeCompactFile(const std::string &dbname,
2416                                     uint16_t vbid,
2417                                     uint64_t fileRev) {
2418
2419    std::string dbfile = getDBFileName(dbname, vbid, fileRev);
2420    std::string compact_file = dbfile + ".compact";
2421    removeCompactFile(compact_file);
2422}
2423
2424void CouchKVStore::removeCompactFile(const std::string &filename) {
2425
2426    if (access(filename.c_str(), F_OK) == 0) {
2427        if (remove(filename.c_str()) == 0) {
2428            LOG(EXTENSION_LOG_WARNING,
2429                "Warning: Removed compact file '%s'", filename.c_str());
2430        }
2431        else {
2432            LOG(EXTENSION_LOG_WARNING,
2433                "Warning: Failed to remove compact file '%s': %s",
2434                filename.c_str(), getSystemStrerror().c_str());
2435
2436            if (errno != ENOENT) {
2437                pendingFileDeletions.push(const_cast<std::string &>(filename));
2438            }
2439        }
2440    }
2441}
2442
2443/* end of couch-kvstore.cc */
2444