1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2012 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#ifdef _MSC_VER
21#include <direct.h>
22#define mkdir(a, b) _mkdir(a)
23#endif
24
25#include <fcntl.h>
26#include <stdio.h>
27#include <string.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30
31#include <algorithm>
32#include <cctype>
33#include <cstdlib>
34#include <fstream>
35#include <iostream>
36#include <list>
37#include <map>
38#include <string>
39#include <utility>
40#include <vector>
41#include <platform/dirutils.h>
42#include <cJSON.h>
43
44#include "common.h"
45#include "couch-kvstore/couch-kvstore.h"
46#define STATWRITER_NAMESPACE couchstore_engine
47#include "statwriter.h"
48#undef STATWRITER_NAMESPACE
49
50using namespace CouchbaseDirectoryUtilities;
51
52static const int MUTATION_FAILED = -1;
53static const int DOC_NOT_FOUND = 0;
54static const int MUTATION_SUCCESS = 1;
55
56static const int MAX_OPEN_DB_RETRY = 10;
57
58static const uint32_t DEFAULT_META_LEN = 16;
59
60class NoLookupCallback : public Callback<CacheLookup> {
61public:
62    NoLookupCallback() {}
63    ~NoLookupCallback() {}
64    void callback(CacheLookup&) {}
65};
66
67class NoRangeCallback : public Callback<SeqnoRange> {
68public:
69    NoRangeCallback() {}
70    ~NoRangeCallback() {}
71    void callback(SeqnoRange&) {}
72};
73
74extern "C" {
75    static int recordDbDumpC(Db *db, DocInfo *docinfo, void *ctx)
76    {
77        return CouchKVStore::recordDbDump(db, docinfo, ctx);
78    }
79}
80
81extern "C" {
82    static int getMultiCbC(Db *db, DocInfo *docinfo, void *ctx)
83    {
84        return CouchKVStore::getMultiCb(db, docinfo, ctx);
85    }
86}
87
88static std::string getStrError(Db *db) {
89    const size_t max_msg_len = 256;
90    char msg[max_msg_len];
91    couchstore_last_os_error(db, msg, max_msg_len);
92    std::string errorStr(msg);
93    return errorStr;
94}
95
96static std::string getSystemStrerror(void) {
97    std::stringstream ss;
98#ifdef WIN32
99    char* win_msg = NULL;
100    DWORD err = GetLastError();
101    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
102                   FORMAT_MESSAGE_FROM_SYSTEM |
103                   FORMAT_MESSAGE_IGNORE_INSERTS,
104                   NULL, err,
105                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
106                   (LPTSTR) &win_msg,
107                   0, NULL);
108    ss << "errno = " << err << ": '" << win_msg << "'";
109    LocalFree(win_msg);
110#else
111    ss << "errno = " << errno << ": '" << strerror(errno) << "'";
112#endif
113
114    return ss.str();
115}
116
117static const std::string getJSONObjString(const cJSON *i)
118{
119    if (i == NULL) {
120        return "";
121    }
122    if (i->type != cJSON_String) {
123        abort();
124    }
125    return i->valuestring;
126}
127
128static bool endWithCompact(const std::string &filename)
129{
130    size_t pos = filename.find(".compact");
131    if (pos == std::string::npos || (filename.size() - sizeof(".compact")) != pos) {
132        return false;
133    }
134    return true;
135}
136
137static void discoverDbFiles(const std::string &dir, std::vector<std::string> &v)
138{
139    std::vector<std::string> files = findFilesContaining(dir, ".couch");
140    std::vector<std::string>::iterator ii;
141    for (ii = files.begin(); ii != files.end(); ++ii) {
142        if (!endWithCompact(*ii)) {
143            v.push_back(*ii);
144        }
145    }
146}
147
148static uint64_t computeMaxDeletedSeqNum(DocInfo **docinfos, const int numdocs)
149{
150    uint64_t max = 0;
151    for (int idx = 0; idx < numdocs; idx++) {
152        if (docinfos[idx]->deleted) {
153            // check seq number only from a deleted file
154            uint64_t seqNum = docinfos[idx]->rev_seq;
155            max = std::max(seqNum, max);
156        }
157    }
158    return max;
159}
160
161static int getMutationStatus(couchstore_error_t errCode)
162{
163    switch (errCode) {
164    case COUCHSTORE_SUCCESS:
165        return MUTATION_SUCCESS;
166    case COUCHSTORE_ERROR_NO_HEADER:
167    case COUCHSTORE_ERROR_NO_SUCH_FILE:
168    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
169        // this return causes ep engine to drop the failed flush
170        // of an item since it does not know about the itme any longer
171        return DOC_NOT_FOUND;
172    default:
173        // this return causes ep engine to keep requeuing the failed
174        // flush of an item
175        return MUTATION_FAILED;
176    }
177}
178
179static bool allDigit(std::string &input)
180{
181    size_t numchar = input.length();
182    for(size_t i = 0; i < numchar; ++i) {
183        if (!isdigit(input[i])) {
184            return false;
185        }
186    }
187    return true;
188}
189
190static std::string couchkvstore_strerrno(Db *db, couchstore_error_t err) {
191    return (err == COUCHSTORE_ERROR_OPEN_FILE ||
192            err == COUCHSTORE_ERROR_READ ||
193            err == COUCHSTORE_ERROR_WRITE) ? getStrError(db) : "none";
194}
195
196struct GetMultiCbCtx {
197    GetMultiCbCtx(CouchKVStore &c, uint16_t v, vb_bgfetch_queue_t &f) :
198        cks(c), vbId(v), fetches(f) {}
199
200    CouchKVStore &cks;
201    uint16_t vbId;
202    vb_bgfetch_queue_t &fetches;
203};
204
205struct StatResponseCtx {
206public:
207    StatResponseCtx(std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &sm,
208                    uint16_t vb) : statMap(sm), vbId(vb) {
209        /* EMPTY */
210    }
211
212    std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &statMap;
213    uint16_t vbId;
214};
215
216struct LoadResponseCtx {
217    shared_ptr<Callback<GetValue> > callback;
218    shared_ptr<Callback<CacheLookup> > lookup;
219    uint16_t vbucketId;
220    bool keysonly;
221    EPStats *stats;
222};
223
224struct AllKeysCtx {
225    AllKeysCtx(AllKeysCB *callback, uint32_t cnt) :
226        cb(callback), count(cnt) { }
227
228    AllKeysCB *cb;
229    uint32_t count;
230};
231
232CouchRequest::CouchRequest(const Item &it, uint64_t rev,
233                           CouchRequestCallback &cb, bool del) :
234    value(it.getValue()), vbucketId(it.getVBucketId()), fileRevNum(rev),
235    key(it.getKey()), deleteItem(del)
236{
237    uint64_t cas = htonll(it.getCas());
238    uint32_t flags = it.getFlags();
239    uint32_t vlen = it.getNBytes();
240    uint32_t exptime = it.getExptime();
241
242    // Datatype used to determine whether document requires compression or not
243    uint8_t datatype;
244
245    // Save time of deletion in expiry time field of deleted item's metadata.
246    if (del) {
247        exptime = ep_real_time();
248    }
249    exptime = htonl(exptime);
250
251    dbDoc.id.buf = const_cast<char *>(key.c_str());
252    dbDoc.id.size = it.getNKey();
253    if (vlen) {
254        dbDoc.data.buf = const_cast<char *>(value->getData());
255        dbDoc.data.size = vlen;
256        datatype = it.getDataType();
257    } else {
258        dbDoc.data.buf = NULL;
259        dbDoc.data.size = 0;
260        datatype = 0x00;
261    }
262
263    memset(meta, 0, sizeof(meta));
264    memcpy(meta, &cas, 8);
265    memcpy(meta + 8, &exptime, 4);
266    memcpy(meta + 12, &flags, 4);
267    *(meta + DEFAULT_META_LEN) = FLEX_META_CODE;
268    memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET, it.getExtMeta(),
269           it.getExtMetaLen());
270
271    dbDocInfo.db_seq = it.getBySeqno();
272    dbDocInfo.rev_meta.buf = reinterpret_cast<char *>(meta);
273    dbDocInfo.rev_meta.size = COUCHSTORE_METADATA_SIZE;
274    dbDocInfo.rev_seq = it.getRevSeqno();
275    dbDocInfo.size = dbDoc.data.size;
276    if (del) {
277        dbDocInfo.deleted =  1;
278        callback.delCb = cb.delCb;
279    } else {
280        dbDocInfo.deleted = 0;
281        callback.setCb = cb.setCb;
282    }
283    dbDocInfo.id = dbDoc.id;
284    dbDocInfo.content_meta = (datatype == PROTOCOL_BINARY_DATATYPE_JSON) ?
285                                    COUCH_DOC_IS_JSON : COUCH_DOC_NON_JSON_MODE;
286
287    //Compress only those documents that aren't already compressed.
288    if (dbDoc.data.size > 0 && !deleteItem) {
289        if (datatype == PROTOCOL_BINARY_RAW_BYTES ||
290                datatype == PROTOCOL_BINARY_DATATYPE_JSON) {
291            dbDocInfo.content_meta |= COUCH_DOC_IS_COMPRESSED;
292        }
293    }
294    start = gethrtime();
295}
296
297CouchKVStore::CouchKVStore(EPStats &stats, Configuration &config, bool read_only) :
298    KVStore(read_only), epStats(stats), configuration(config),
299    dbname(configuration.getDbname()), couchNotifier(NULL),
300    intransaction(false), dbFileRevMapPopulated(false)
301{
302    open();
303    statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
304
305    // init db file map with default revision number, 1
306    numDbFiles = static_cast<uint16_t>(configuration.getMaxVbuckets());
307    for (uint16_t i = 0; i < numDbFiles; i++) {
308        dbFileRevMap.push_back(1);
309    }
310}
311
312CouchKVStore::CouchKVStore(const CouchKVStore &copyFrom) :
313    KVStore(copyFrom), epStats(copyFrom.epStats),
314    configuration(copyFrom.configuration),
315    dbname(copyFrom.dbname),
316    couchNotifier(NULL), dbFileRevMap(copyFrom.dbFileRevMap),
317    numDbFiles(copyFrom.numDbFiles),
318    intransaction(false),
319    dbFileRevMapPopulated(copyFrom.dbFileRevMapPopulated)
320{
321    open();
322    statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
323}
324
325void CouchKVStore::reset(uint16_t shardId)
326{
327    cb_assert(!isReadOnly());
328    // TODO CouchKVStore::flush() when couchstore api ready
329
330    if (shardId == 0) {
331        //Notify when PRIMARY_SHARD
332        RememberingCallback<bool> cb;
333        couchNotifier->flush(cb);
334        cb.waitForValue();
335    }
336
337    vbucket_map_t::iterator itor = cachedVBStates.begin();
338    for (; itor != cachedVBStates.end(); ++itor) {
339        uint16_t vbucket = itor->first;
340        if (vbucket % configuration.getMaxNumShards() != shardId) {
341            continue;
342        }
343        itor->second.checkpointId = 0;
344        itor->second.maxDeletedSeqno = 0;
345        resetVBucket(vbucket, itor->second);
346        updateDbFileMap(vbucket, 1);
347    }
348}
349
350void CouchKVStore::set(const Item &itm, Callback<mutation_result> &cb)
351{
352    cb_assert(!isReadOnly());
353    cb_assert(intransaction);
354    bool deleteItem = false;
355    CouchRequestCallback requestcb;
356    std::string dbFile;
357    uint64_t fileRev = dbFileRevMap[itm.getVBucketId()];
358
359    // each req will be de-allocated after commit
360    requestcb.setCb = &cb;
361    CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, deleteItem);
362    pendingReqsQ.push_back(req);
363}
364
365void CouchKVStore::get(const std::string &key, uint64_t, uint16_t vb,
366                       Callback<GetValue> &cb, bool fetchDelete)
367{
368    Db *db = NULL;
369    std::string dbFile;
370    GetValue rv;
371    uint64_t fileRev = dbFileRevMap[vb];
372
373    couchstore_error_t errCode = openDB(vb, fileRev, &db,
374                                        COUCHSTORE_OPEN_FLAG_RDONLY);
375    if (errCode != COUCHSTORE_SUCCESS) {
376        ++st.numGetFailure;
377        LOG(EXTENSION_LOG_WARNING,
378            "Warning: failed to open database to retrieve data "
379            "from vBucketId = %d, key = %s, file = %s\n",
380            vb, key.c_str(), dbFile.c_str());
381        rv.setStatus(couchErr2EngineErr(errCode));
382        cb.callback(rv);
383        return;
384    }
385
386    getWithHeader(db, key, vb, cb, fetchDelete);
387    closeDatabaseHandle(db);
388}
389
390void CouchKVStore::getWithHeader(void *dbHandle, const std::string &key,
391                                 uint16_t vb, Callback<GetValue> &cb,
392                                 bool fetchDelete) {
393
394    Db *db = (Db *)dbHandle;
395    hrtime_t start = gethrtime();
396    RememberingCallback<GetValue> *rc = dynamic_cast<RememberingCallback<GetValue> *>(&cb);
397    bool getMetaOnly = rc && rc->val.isPartial();
398    DocInfo *docInfo = NULL;
399    sized_buf id;
400    GetValue rv;
401    std::string dbFile;
402
403    id.size = key.size();
404    id.buf = const_cast<char *>(key.c_str());
405    couchstore_error_t errCode = couchstore_docinfo_by_id(db, (uint8_t *)id.buf,
406                                                          id.size, &docInfo);
407    if (errCode != COUCHSTORE_SUCCESS) {
408        if (!getMetaOnly) {
409            // log error only if this is non-xdcr case
410            LOG(EXTENSION_LOG_WARNING,
411                "Warning: failed to retrieve doc info from "
412                "database, name=%s key=%s error=%s [%s]\n",
413                dbFile.c_str(), id.buf, couchstore_strerror(errCode),
414                couchkvstore_strerrno(db, errCode).c_str());
415        }
416    } else {
417        cb_assert(docInfo);
418        errCode = fetchDoc(db, docInfo, rv, vb, getMetaOnly, fetchDelete);
419        if (errCode != COUCHSTORE_SUCCESS) {
420            LOG(EXTENSION_LOG_WARNING,
421                "Warning: failed to retrieve key value from "
422                "database, name=%s key=%s error=%s [%s] "
423                "deleted=%s", dbFile.c_str(), id.buf,
424                couchstore_strerror(errCode),
425                couchkvstore_strerrno(db, errCode).c_str(),
426                docInfo->deleted ? "yes" : "no");
427        }
428
429        // record stats
430        st.readTimeHisto.add((gethrtime() - start) / 1000);
431        if (errCode == COUCHSTORE_SUCCESS) {
432            st.readSizeHisto.add(key.length() + rv.getValue()->getNBytes());
433        }
434    }
435
436    if(errCode != COUCHSTORE_SUCCESS) {
437        ++st.numGetFailure;
438    }
439
440    couchstore_free_docinfo(docInfo);
441    rv.setStatus(couchErr2EngineErr(errCode));
442    cb.callback(rv);
443}
444
445void CouchKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t &itms)
446{
447    std::string dbFile;
448    int numItems = itms.size();
449    uint64_t fileRev = dbFileRevMap[vb];
450
451    Db *db = NULL;
452    couchstore_error_t errCode = openDB(vb, fileRev, &db,
453                                        COUCHSTORE_OPEN_FLAG_RDONLY);
454    if (errCode != COUCHSTORE_SUCCESS) {
455        LOG(EXTENSION_LOG_WARNING,
456            "Warning: failed to open database for data fetch, "
457            "vBucketId = %d file = %s numDocs = %d\n",
458            vb, dbFile.c_str(), numItems);
459        st.numGetFailure.fetch_add(numItems);
460        vb_bgfetch_queue_t::iterator itr = itms.begin();
461        for (; itr != itms.end(); ++itr) {
462            std::list<VBucketBGFetchItem *> &fetches = (*itr).second;
463            std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
464            for (; fitr != fetches.end(); ++fitr) {
465                (*fitr)->value.setStatus(ENGINE_NOT_MY_VBUCKET);
466            }
467        }
468        return;
469    }
470
471    size_t idx = 0;
472    sized_buf *ids = new sized_buf[itms.size()];
473    vb_bgfetch_queue_t::iterator itr = itms.begin();
474    for (; itr != itms.end(); ++itr) {
475        ids[idx].size = itr->first.size();
476        ids[idx].buf = const_cast<char *>(itr->first.c_str());
477        ++idx;
478    }
479
480    GetMultiCbCtx ctx(*this, vb, itms);
481    errCode = couchstore_docinfos_by_id(db, ids, itms.size(),
482                                        0, getMultiCbC, &ctx);
483    if (errCode != COUCHSTORE_SUCCESS) {
484        st.numGetFailure.fetch_add(numItems);
485        for (itr = itms.begin(); itr != itms.end(); ++itr) {
486            LOG(EXTENSION_LOG_WARNING, "Warning: failed to read database by"
487                " vBucketId = %d key = %s file = %s error = %s [%s]\n",
488                vb, (*itr).first.c_str(),
489                dbFile.c_str(), couchstore_strerror(errCode),
490                couchkvstore_strerrno(db, errCode).c_str());
491            std::list<VBucketBGFetchItem *> &fetches = (*itr).second;
492            std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
493            for (; fitr != fetches.end(); ++fitr) {
494                (*fitr)->value.setStatus(couchErr2EngineErr(errCode));
495            }
496        }
497    }
498    closeDatabaseHandle(db);
499    delete []ids;
500}
501
502void CouchKVStore::del(const Item &itm,
503                       Callback<int> &cb)
504{
505    cb_assert(!isReadOnly());
506    cb_assert(intransaction);
507    uint16_t fileRev = dbFileRevMap[itm.getVBucketId()];
508    CouchRequestCallback requestcb;
509    requestcb.delCb = &cb;
510    CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, true);
511    pendingReqsQ.push_back(req);
512}
513
514bool CouchKVStore::delVBucket(uint16_t vbucket, bool recreate)
515{
516    cb_assert(!isReadOnly());
517    cb_assert(couchNotifier);
518    RememberingCallback<bool> cb;
519
520    couchNotifier->delVBucket(vbucket, cb);
521    cb.waitForValue();
522
523    if (recreate) {
524        vbucket_state vbstate(vbucket_state_dead, 0, 0, 0);
525        vbucket_map_t::iterator it = cachedVBStates.find(vbucket);
526        if (it != cachedVBStates.end()) {
527            vbstate.state = it->second.state;
528        }
529        cachedVBStates[vbucket] = vbstate;
530        resetVBucket(vbucket, vbstate);
531    } else {
532        cachedVBStates.erase(vbucket);
533    }
534    updateDbFileMap(vbucket, 1);
535    return cb.val;
536}
537
538vbucket_map_t CouchKVStore::listPersistedVbuckets()
539{
540    std::vector<std::string> files;
541
542    if (!dbFileRevMapPopulated) {
543        // warmup, first discover db files from local directory
544        discoverDbFiles(dbname, files);
545        populateFileNameMap(files);
546    }
547
548    if (!cachedVBStates.empty()) {
549        cachedVBStates.clear();
550    }
551
552    Db *db = NULL;
553    couchstore_error_t errorCode;
554    for (uint16_t id = 0; id < numDbFiles; id++) {
555        uint64_t rev = dbFileRevMap[id];
556        errorCode = openDB(id, rev, &db, COUCHSTORE_OPEN_FLAG_RDONLY);
557        if (errorCode != COUCHSTORE_SUCCESS) {
558            std::stringstream revnum, vbid;
559            revnum  << rev;
560            vbid << id;
561            std::string fileName =
562                dbname + "/" + vbid.str() + ".couch." + revnum.str();
563            LOG(EXTENSION_LOG_WARNING,
564                "Warning: failed to open database file, name=%s\n",
565                fileName.c_str());
566            remVBucketFromDbFileMap(id);
567        } else {
568            vbucket_state vb_state;
569
570            /* read state of VBucket from db file */
571            readVBState(db, id, vb_state);
572            /* insert populated state to the array to return to the caller */
573            cachedVBStates[id] = vb_state;
574            /* update stat */
575            ++st.numLoadedVb;
576            closeDatabaseHandle(db);
577        }
578        db = NULL;
579        removeCompactFile(dbname, id, rev);
580    }
581
582    return cachedVBStates;
583}
584
585void CouchKVStore::getPersistedStats(std::map<std::string, std::string> &stats)
586{
587    char *buffer = NULL;
588    std::string fname = dbname + "/stats.json";
589    if (access(fname.c_str(), R_OK) == -1) {
590        return ;
591    }
592
593    std::ifstream session_stats;
594    session_stats.exceptions (session_stats.failbit | session_stats.badbit);
595    try {
596        session_stats.open(fname.c_str(), std::ios::binary);
597        session_stats.seekg(0, std::ios::end);
598        int flen = session_stats.tellg();
599        if (flen < 0) {
600            LOG(EXTENSION_LOG_WARNING,
601                "Warning: error in session stats ifstream!!!");
602            session_stats.close();
603            return;
604        }
605        session_stats.seekg(0, std::ios::beg);
606        buffer = new char[flen + 1];
607        session_stats.read(buffer, flen);
608        session_stats.close();
609        buffer[flen] = '\0';
610
611        cJSON *json_obj = cJSON_Parse(buffer);
612        if (!json_obj) {
613            LOG(EXTENSION_LOG_WARNING,
614                "Warning: failed to parse the session stats json doc!!!");
615            delete[] buffer;
616            return;
617        }
618
619        int json_arr_size = cJSON_GetArraySize(json_obj);
620        for (int i = 0; i < json_arr_size; ++i) {
621            cJSON *obj = cJSON_GetArrayItem(json_obj, i);
622            if (obj) {
623                stats[obj->string] = obj->valuestring ? obj->valuestring : "";
624            }
625        }
626        cJSON_Delete(json_obj);
627
628    } catch (const std::ifstream::failure &e) {
629        LOG(EXTENSION_LOG_WARNING,
630            "Warning: failed to load the engine session stats "
631            " due to IO exception \"%s\"", e.what());
632    } catch (...) {
633        LOG(EXTENSION_LOG_WARNING,
634            "Warning: failed to load the engine session stats "
635            " due to IO exception");
636    }
637
638    delete[] buffer;
639}
640
641static std::string getDBFileName(const std::string &dbname,
642                                 uint16_t vbid,
643                                 uint64_t rev)
644{
645    std::stringstream ss;
646    ss << dbname << "/" << vbid << ".couch." << rev;
647    return ss.str();
648}
649
650static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
651    compaction_ctx* ctx = (compaction_ctx*) ctx_p;
652
653    //Compaction finished
654    if (info == NULL) {
655        return couchstore_set_purge_seq(d, ctx->max_purged_seq);
656    }
657
658    if (info->rev_meta.size >= DEFAULT_META_LEN) {
659        uint32_t exptime;
660        memcpy(&exptime, info->rev_meta.buf + 8, 4);
661        exptime = ntohl(exptime);
662        if (info->deleted) {
663            if (!ctx->drop_deletes) { // caller wants to retain deleted items
664                return COUCHSTORE_COMPACT_KEEP_ITEM;
665            }
666            if (exptime < ctx->purge_before_ts &&
667                    (!ctx->purge_before_seq ||
668                     info->db_seq <= ctx->purge_before_seq)) {
669                if (ctx->max_purged_seq < info->db_seq) {
670                    ctx->max_purged_seq = info->db_seq;
671                }
672                return COUCHSTORE_COMPACT_DROP_ITEM;
673            }
674        } else if (exptime && exptime < ctx->curr_time) {
675            std::string keyStr(info->id.buf, info->id.size);
676            expiredItemCtx expItem = { info->rev_seq, keyStr };
677            ctx->expiredItems.push_back(expItem);
678        }
679    }
680
681    return COUCHSTORE_COMPACT_KEEP_ITEM;
682}
683
684bool CouchKVStore::compactVBucket(const uint16_t vbid,
685                                  compaction_ctx *hook_ctx,
686                                  Callback<compaction_ctx> &cb,
687                                  Callback<kvstats_ctx> &kvcb) {
688    couchstore_compact_hook       hook = time_purge_hook;
689    const couch_file_ops     *def_iops = couchstore_get_default_file_ops();
690    Db                      *compactdb = NULL;
691    Db                       *targetDb = NULL;
692    uint64_t                   fileRev = dbFileRevMap[vbid];
693    uint64_t                   new_rev = fileRev + 1;
694    couchstore_error_t         errCode = COUCHSTORE_SUCCESS;
695    hrtime_t                     start = gethrtime();
696    uint64_t              newHeaderPos = 0;
697    std::string                 dbfile;
698    std::string           compact_file;
699    std::string               new_file;
700    kvstats_ctx                  kvctx;
701    DbInfo                        info;
702
703    // Open the source VBucket database file ...
704    errCode = openDB(vbid, fileRev, &compactdb,
705                     (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
706    if (errCode != COUCHSTORE_SUCCESS) {
707        LOG(EXTENSION_LOG_WARNING,
708                "Warning: failed to open database, vbucketId = %d "
709                "fileRev = %llu", vbid, fileRev);
710        return notifyCompaction(vbid, new_rev, VB_COMPACT_OPENDB_ERROR, 0);
711    }
712
713    // Build the temporary vbucket.compact file name
714    dbfile       = getDBFileName(dbname, vbid, fileRev);
715    compact_file = dbfile + ".compact";
716
717    // Perform COMPACTION of vbucket.couch.rev into vbucket.couch.rev.compact
718    errCode = couchstore_compact_db_ex(compactdb, compact_file.c_str(), 0,
719                                       hook, hook_ctx, def_iops);
720    if (errCode != COUCHSTORE_SUCCESS) {
721        LOG(EXTENSION_LOG_WARNING,
722            "Warning: failed to compact database with name=%s "
723            "error=%s errno=%s",
724            dbfile.c_str(),
725            couchstore_strerror(errCode),
726            couchkvstore_strerrno(compactdb, errCode).c_str());
727        closeDatabaseHandle(compactdb);
728
729        return notifyCompaction(vbid, new_rev, VB_COMPACT_OPENDB_ERROR, 0);
730    }
731
732    // Close the source Database File once compaction is done
733    closeDatabaseHandle(compactdb);
734
735    // Rename the .compact file to one with the next revision number
736    new_file = getDBFileName(dbname, vbid, new_rev);
737    if (rename(compact_file.c_str(), new_file.c_str()) != 0) {
738        LOG(EXTENSION_LOG_WARNING,
739            "Warning: failed to rename '%s' to '%s': %s",
740            compact_file.c_str(), new_file.c_str(),
741            getSystemStrerror().c_str());
742
743        removeCompactFile(compact_file);
744        return notifyCompaction(vbid, new_rev, VB_COMPACT_RENAME_ERROR, 0);
745    }
746
747    // Open the newly compacted VBucket database file ...
748    errCode = openDB(vbid, new_rev, &targetDb,
749                     (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
750    if (errCode != COUCHSTORE_SUCCESS) {
751        LOG(EXTENSION_LOG_WARNING,
752                "Warning: failed to open compacted database file %s "
753                "fileRev = %llu", new_file.c_str(), new_rev);
754        if (remove(new_file.c_str()) != 0) {
755            LOG(EXTENSION_LOG_WARNING, NULL,
756                "Warning: Failed to remove '%s': %s",
757                new_file.c_str(), getSystemStrerror().c_str());
758        }
759        return notifyCompaction(vbid, new_rev, VB_COMPACT_OPENDB_ERROR, 0);
760    }
761
762    // Update the global VBucket file map so all operations use the new file
763    updateDbFileMap(vbid, new_rev);
764
765    LOG(EXTENSION_LOG_INFO,
766            "INFO: created new couch db file, name=%s rev=%llu",
767            new_file.c_str(), new_rev);
768
769    // Update stats to caller
770    kvctx.vbucket = vbid;
771    couchstore_db_info(targetDb, &info);
772    kvctx.fileSpaceUsed = info.space_used;
773    kvctx.fileSize = info.file_size;
774    kvcb.callback(kvctx);
775
776    // Notify MCCouch that compaction is Done...
777    newHeaderPos = couchstore_get_header_position(targetDb);
778    closeDatabaseHandle(targetDb);
779
780    bool retVal = notifyCompaction(vbid, new_rev, VB_COMPACTION_DONE,
781                                   newHeaderPos);
782
783    if (hook_ctx->expiredItems.size()) {
784        cb.callback(*hook_ctx);
785    }
786
787    st.compactHisto.add((gethrtime() - start) / 1000);
788    return retVal;
789}
790
791bool CouchKVStore::notifyCompaction(const uint16_t vbid, uint64_t new_rev,
792                                    uint32_t result, uint64_t header_pos) {
793    RememberingCallback<uint16_t> lcb;
794
795    VBStateNotification vbs(0, 0, result, vbid);
796
797    couchNotifier->notify_update(vbs, new_rev, header_pos, lcb);
798    if (lcb.val != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
799        LOG(EXTENSION_LOG_WARNING,
800                "Warning: compactor failed to notify mccouch on vbucket "
801                "%d. err %d", vbid, lcb.val);
802        return false;
803    }
804    return true;
805}
806
807bool CouchKVStore::snapshotVBuckets(const vbucket_map_t &vbstates,
808                                    Callback<kvstats_ctx> *cb)
809{
810    cb_assert(!isReadOnly());
811    bool success = true;
812
813    vbucket_map_t::const_reverse_iterator iter = vbstates.rbegin();
814    for (; iter != vbstates.rend(); ++iter) {
815        bool notify = false;
816        uint16_t vbucketId = iter->first;
817        vbucket_state vbstate = iter->second;
818        vbucket_map_t::iterator it = cachedVBStates.find(vbucketId);
819        uint32_t vb_change_type = VB_NO_CHANGE;
820        if (it != cachedVBStates.end()) {
821            if (it->second.state != vbstate.state) {
822                vb_change_type |= VB_STATE_CHANGED;
823                notify = true;
824            }
825            if (it->second.checkpointId != vbstate.checkpointId) {
826                vb_change_type |= VB_CHECKPOINT_CHANGED;
827                notify = true;
828            }
829
830            if (it->second.failovers.compare(vbstate.failovers) == 0 &&
831                vb_change_type == VB_NO_CHANGE) {
832                continue; // no changes
833            }
834            it->second.state = vbstate.state;
835            it->second.checkpointId = vbstate.checkpointId;
836            it->second.failovers = vbstate.failovers;
837            // Note that the max deleted seq number is maintained within CouchKVStore
838            vbstate.maxDeletedSeqno = it->second.maxDeletedSeqno;
839        } else {
840            vb_change_type = VB_STATE_CHANGED;
841            cachedVBStates[vbucketId] = vbstate;
842            notify = true;
843        }
844
845        success = setVBucketState(vbucketId, vbstate, vb_change_type, cb,
846                                  notify);
847        if (!success) {
848            LOG(EXTENSION_LOG_WARNING,
849                "Warning: failed to set new state, %s, for vbucket %d\n",
850                VBucket::toString(vbstate.state), vbucketId);
851            break;
852        }
853    }
854    return success;
855}
856
857bool CouchKVStore::snapshotStats(const std::map<std::string, std::string> &stats)
858{
859    cb_assert(!isReadOnly());
860    size_t count = 0;
861    size_t size = stats.size();
862    std::stringstream stats_buf;
863    stats_buf << "{";
864    std::map<std::string, std::string>::const_iterator it = stats.begin();
865    for (; it != stats.end(); ++it) {
866        stats_buf << "\"" << it->first << "\": \"" << it->second << "\"";
867        ++count;
868        if (count < size) {
869            stats_buf << ", ";
870        }
871    }
872    stats_buf << "}";
873
874    // TODO: This stats json should be written into the master database. However,
875    // we don't support the write synchronization between CouchKVStore in C++ and
876    // compaction manager in the erlang side for the master database yet. At this time,
877    // we simply log the engine stats into a separate json file. As part of futhre work,
878    // we need to get rid of the tight coupling between those two components.
879    bool rv = true;
880    std::string next_fname = dbname + "/stats.json.new";
881    std::ofstream new_stats;
882    new_stats.exceptions (new_stats.failbit | new_stats.badbit);
883    try {
884        new_stats.open(next_fname.c_str());
885        new_stats << stats_buf.str().c_str() << std::endl;
886        new_stats.flush();
887        new_stats.close();
888    } catch (const std::ofstream::failure& e) {
889        LOG(EXTENSION_LOG_WARNING, "Warning: failed to log the engine stats to "
890            "file \"%s\" due to IO exception \"%s\"; Not critical because new "
891            "stats will be dumped later, please ignore.",
892            next_fname.c_str(), e.what());
893        rv = false;
894    }
895
896    if (rv) {
897        std::string old_fname = dbname + "/stats.json.old";
898        std::string stats_fname = dbname + "/stats.json";
899        if (access(old_fname.c_str(), F_OK) == 0 && remove(old_fname.c_str()) != 0) {
900            LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
901                old_fname.c_str(), strerror(errno));
902            remove(next_fname.c_str());
903            rv = false;
904        } else if (access(stats_fname.c_str(), F_OK) == 0 &&
905                   rename(stats_fname.c_str(), old_fname.c_str()) != 0) {
906            LOG(EXTENSION_LOG_WARNING,
907                "Warning: failed to rename '%s' to '%s': %s",
908                stats_fname.c_str(), old_fname.c_str(), strerror(errno));
909            remove(next_fname.c_str());
910            rv = false;
911        } else if (rename(next_fname.c_str(), stats_fname.c_str()) != 0) {
912            LOG(EXTENSION_LOG_WARNING,
913                "Warning: failed to rename '%s' to '%s': %s",
914                next_fname.c_str(), stats_fname.c_str(), strerror(errno));
915            remove(next_fname.c_str());
916            rv = false;
917        }
918    }
919
920    return rv;
921}
922
923bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
924                                   uint32_t vb_change_type,
925                                   Callback<kvstats_ctx> *kvcb,
926                                   bool notify)
927{
928    Db *db = NULL;
929    uint64_t fileRev, newFileRev;
930    std::stringstream id;
931    std::string dbFileName;
932    std::map<uint16_t, uint64_t>::iterator mapItr;
933    kvstats_ctx kvctx;
934    kvctx.vbucket = vbucketId;
935
936    id << vbucketId;
937    dbFileName = dbname + "/" + id.str() + ".couch." + id.str();
938    fileRev = dbFileRevMap[vbucketId];
939
940    couchstore_error_t errorCode;
941    errorCode = openDB(vbucketId, fileRev, &db,
942            (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE, &newFileRev);
943    if (errorCode != COUCHSTORE_SUCCESS) {
944        ++st.numVbSetFailure;
945        LOG(EXTENSION_LOG_WARNING,
946                "Warning: failed to open database, name=%s",
947                dbFileName.c_str());
948        return false;
949    }
950
951    fileRev = newFileRev;
952    errorCode = saveVBState(db, vbstate);
953    if (errorCode != COUCHSTORE_SUCCESS) {
954        ++st.numVbSetFailure;
955        LOG(EXTENSION_LOG_WARNING,
956                "Warning: failed to save local doc, name=%s",
957                dbFileName.c_str());
958        closeDatabaseHandle(db);
959        return false;
960    }
961
962    errorCode = couchstore_commit(db);
963    if (errorCode != COUCHSTORE_SUCCESS) {
964        ++st.numVbSetFailure;
965        LOG(EXTENSION_LOG_WARNING,
966                "Warning: commit failed, vbid=%u rev=%llu error=%s [%s]",
967                vbucketId, fileRev, couchstore_strerror(errorCode),
968                couchkvstore_strerrno(db, errorCode).c_str());
969        closeDatabaseHandle(db);
970        return false;
971    } else if (notify) {
972        uint64_t newHeaderPos = couchstore_get_header_position(db);
973        RememberingCallback<uint16_t> lcb;
974
975        VBStateNotification vbs(vbstate.checkpointId, vbstate.state,
976                vb_change_type, vbucketId);
977
978        couchNotifier->notify_update(vbs, fileRev, newHeaderPos, lcb);
979        if (lcb.val != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
980            cb_assert(lcb.val != PROTOCOL_BINARY_RESPONSE_ETMPFAIL);
981            LOG(EXTENSION_LOG_WARNING,
982                    "Warning: failed to notify CouchDB of update, "
983                    "vbid=%u rev=%llu error=0x%x\n",
984                    vbucketId, fileRev, lcb.val);
985            if (!epStats.isShutdown) {
986                closeDatabaseHandle(db);
987                return false;
988            }
989        }
990    }
991    if (kvcb) {
992        DbInfo info;
993        couchstore_db_info(db, &info);
994        kvctx.fileSpaceUsed = info.space_used;
995        kvctx.fileSize = info.file_size;
996        kvcb->callback(kvctx);
997    }
998    closeDatabaseHandle(db);
999
1000    return true;
1001}
1002
1003void CouchKVStore::dump(std::vector<uint16_t> &vbids,
1004                        shared_ptr<Callback<GetValue> > cb,
1005                        shared_ptr<Callback<CacheLookup> > cl)
1006{
1007    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
1008    std::vector<uint16_t>::iterator itr = vbids.begin();
1009    for (; itr != vbids.end(); ++itr) {
1010        loadDB(cb, cl, sr, false, *itr, 0, COUCHSTORE_NO_DELETES);
1011    }
1012}
1013
1014void CouchKVStore::dump(uint16_t vb, uint64_t stSeqno,
1015                        shared_ptr<Callback<GetValue> > cb,
1016                        shared_ptr<Callback<CacheLookup> > cl,
1017                        shared_ptr<Callback<SeqnoRange> > sr)
1018{
1019    loadDB(cb, cl, sr, false, vb, stSeqno);
1020}
1021
1022void CouchKVStore::dumpKeys(std::vector<uint16_t> &vbids,  shared_ptr<Callback<GetValue> > cb)
1023{
1024    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
1025    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
1026    std::vector<uint16_t>::iterator itr = vbids.begin();
1027    for (; itr != vbids.end(); ++itr) {
1028        loadDB(cb, cl, sr, true, *itr, 0, COUCHSTORE_NO_DELETES);
1029    }
1030}
1031
1032void CouchKVStore::dumpDeleted(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno,
1033                               shared_ptr<Callback<GetValue> > cb)
1034{
1035    std::vector<uint16_t> vbids;
1036    vbids.push_back(vb);
1037    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
1038    shared_ptr<Callback<SeqnoRange> > sr(new NoRangeCallback());
1039    loadDB(cb, cl, sr, true, vb, stSeqno, COUCHSTORE_DELETES_ONLY);
1040}
1041
1042StorageProperties CouchKVStore::getStorageProperties()
1043{
1044    StorageProperties rv(true, true, true, true);
1045    return rv;
1046}
1047
1048bool CouchKVStore::commit(Callback<kvstats_ctx> *cb)
1049{
1050    cb_assert(!isReadOnly());
1051    if (intransaction) {
1052        intransaction = commit2couchstore(cb) ? false : true;
1053    }
1054    return !intransaction;
1055
1056}
1057
1058void CouchKVStore::addStats(const std::string &prefix,
1059                            ADD_STAT add_stat,
1060                            const void *c)
1061{
1062    const char *prefix_str = prefix.c_str();
1063
1064    /* stats for both read-only and read-write threads */
1065    addStat(prefix_str, "backend_type",   "couchstore",       add_stat, c);
1066    addStat(prefix_str, "open",           st.numOpen,         add_stat, c);
1067    addStat(prefix_str, "close",          st.numClose,        add_stat, c);
1068    addStat(prefix_str, "readTime",       st.readTimeHisto,   add_stat, c);
1069    addStat(prefix_str, "readSize",       st.readSizeHisto,   add_stat, c);
1070    addStat(prefix_str, "numLoadedVb",    st.numLoadedVb,     add_stat, c);
1071
1072    // failure stats
1073    addStat(prefix_str, "failure_open",   st.numOpenFailure, add_stat, c);
1074    addStat(prefix_str, "failure_get",    st.numGetFailure,  add_stat, c);
1075
1076    if (!isReadOnly()) {
1077        addStat(prefix_str, "failure_set",   st.numSetFailure,   add_stat, c);
1078        addStat(prefix_str, "failure_del",   st.numDelFailure,   add_stat, c);
1079        addStat(prefix_str, "failure_vbset", st.numVbSetFailure, add_stat, c);
1080        addStat(prefix_str, "lastCommDocs",  st.docsCommitted,   add_stat, c);
1081
1082        // stats for CouchNotifier
1083        couchNotifier->addStats(prefix, add_stat, c);
1084    }
1085}
1086
1087void CouchKVStore::addTimingStats(const std::string &prefix,
1088                                  ADD_STAT add_stat, const void *c) {
1089    if (isReadOnly()) {
1090        return;
1091    }
1092    const char *prefix_str = prefix.c_str();
1093    addStat(prefix_str, "commit",      st.commitHisto,      add_stat, c);
1094    addStat(prefix_str, "compact",     st.compactHisto,     add_stat, c);
1095    addStat(prefix_str, "delete",      st.delTimeHisto,     add_stat, c);
1096    addStat(prefix_str, "save_documents", st.saveDocsHisto, add_stat, c);
1097    addStat(prefix_str, "writeTime",   st.writeTimeHisto,   add_stat, c);
1098    addStat(prefix_str, "writeSize",   st.writeSizeHisto,   add_stat, c);
1099    addStat(prefix_str, "bulkSize",    st.batchSize,        add_stat, c);
1100
1101    // Couchstore file ops stats
1102    addStat(prefix_str, "fsReadTime",  st.fsStats.readTimeHisto,  add_stat, c);
1103    addStat(prefix_str, "fsWriteTime", st.fsStats.writeTimeHisto, add_stat, c);
1104    addStat(prefix_str, "fsSyncTime",  st.fsStats.syncTimeHisto,  add_stat, c);
1105    addStat(prefix_str, "fsReadSize",  st.fsStats.readSizeHisto,  add_stat, c);
1106    addStat(prefix_str, "fsWriteSize", st.fsStats.writeSizeHisto, add_stat, c);
1107    addStat(prefix_str, "fsReadSeek",  st.fsStats.readSeekHisto,  add_stat, c);
1108}
1109
1110template <typename T>
1111void CouchKVStore::addStat(const std::string &prefix, const char *stat, T &val,
1112                           ADD_STAT add_stat, const void *c)
1113{
1114    std::stringstream fullstat;
1115    fullstat << prefix << ":" << stat;
1116    add_casted_stat(fullstat.str().c_str(), val, add_stat, c);
1117}
1118
1119void CouchKVStore::optimizeWrites(std::vector<queued_item> &items)
1120{
1121    cb_assert(!isReadOnly());
1122    if (items.empty()) {
1123        return;
1124    }
1125    CompareQueuedItemsBySeqnoAndKey cq;
1126    std::sort(items.begin(), items.end(), cq);
1127}
1128
1129void CouchKVStore::loadDB(shared_ptr<Callback<GetValue> > cb,
1130                          shared_ptr<Callback<CacheLookup> > cl,
1131                          shared_ptr<Callback<SeqnoRange> > sr,
1132                          bool keysOnly, uint16_t vbid,
1133                          uint64_t startSeqno,
1134                          couchstore_docinfos_options options)
1135{
1136    if (!dbFileRevMapPopulated) {
1137        // warmup, first discover db files from local directory
1138        std::vector<std::string> files;
1139        discoverDbFiles(dbname, files);
1140        populateFileNameMap(files);
1141    }
1142
1143    Db *db = NULL;
1144    uint64_t rev = dbFileRevMap[vbid];
1145    couchstore_error_t errorCode = openDB(vbid, rev, &db,
1146                                          COUCHSTORE_OPEN_FLAG_RDONLY);
1147    if (errorCode != COUCHSTORE_SUCCESS) {
1148        LOG(EXTENSION_LOG_WARNING,
1149            "Failed to open database, name=%s/%d.couch.%lu",
1150            dbname.c_str(), vbid, rev);
1151        remVBucketFromDbFileMap(vbid);
1152    } else {
1153        DbInfo info;
1154        errorCode = couchstore_db_info(db, &info);
1155        if (errorCode != COUCHSTORE_SUCCESS) {
1156            LOG(EXTENSION_LOG_WARNING, "Failed to read DB info for backfill");
1157            closeDatabaseHandle(db);
1158            abort();
1159        }
1160        SeqnoRange range(startSeqno, info.last_sequence);
1161        sr->callback(range);
1162
1163        LoadResponseCtx ctx;
1164        ctx.vbucketId = vbid;
1165        ctx.keysonly = keysOnly;
1166        ctx.callback = cb;
1167        ctx.lookup = cl;
1168        ctx.stats = &epStats;
1169        errorCode = couchstore_changes_since(db, startSeqno, options,
1170                                             recordDbDumpC,
1171                                             static_cast<void *>(&ctx));
1172        if (errorCode != COUCHSTORE_SUCCESS) {
1173            if (errorCode == COUCHSTORE_ERROR_CANCEL) {
1174                LOG(EXTENSION_LOG_WARNING,
1175                    "Canceling loading database, warmup has completed\n");
1176            } else {
1177                LOG(EXTENSION_LOG_WARNING,
1178                    "couchstore_changes_since failed, error=%s [%s]",
1179                    couchstore_strerror(errorCode),
1180                    couchkvstore_strerrno(db, errorCode).c_str());
1181                remVBucketFromDbFileMap(vbid);
1182            }
1183        }
1184        closeDatabaseHandle(db);
1185    }
1186}
1187
1188void CouchKVStore::open()
1189{
1190    // TODO intransaction, is it needed?
1191    intransaction = false;
1192    if (!isReadOnly()) {
1193        couchNotifier = CouchNotifier::create(epStats, configuration);
1194    }
1195
1196    struct stat dbstat;
1197    bool havedir = false;
1198
1199    if (stat(dbname.c_str(), &dbstat) == 0 && (dbstat.st_mode & S_IFDIR) == S_IFDIR) {
1200        havedir = true;
1201    }
1202
1203    if (!havedir) {
1204        if (mkdir(dbname.c_str(), S_IRWXU) == -1) {
1205            std::stringstream ss;
1206            ss << "Warning: Failed to create data directory ["
1207               << dbname << "]: " << strerror(errno);
1208            throw std::runtime_error(ss.str());
1209        }
1210    }
1211}
1212
1213void CouchKVStore::close()
1214{
1215    intransaction = false;
1216    if (!isReadOnly()) {
1217        CouchNotifier::deleteNotifier();
1218    }
1219    couchNotifier = NULL;
1220}
1221
1222uint64_t CouchKVStore::checkNewRevNum(std::string &dbFileName, bool newFile)
1223{
1224    uint64_t newrev = 0;
1225    std::string nameKey;
1226
1227    if (!newFile) {
1228        // extract out the file revision number first
1229        size_t secondDot = dbFileName.rfind(".");
1230        nameKey = dbFileName.substr(0, secondDot);
1231    } else {
1232        nameKey = dbFileName;
1233    }
1234    nameKey.append(".");
1235    const std::vector<std::string> files = findFilesWithPrefix(nameKey);
1236    std::vector<std::string>::const_iterator itor;
1237    // found file(s) whoes name has the same key name pair with different
1238    // revision number
1239    for (itor = files.begin(); itor != files.end(); ++itor) {
1240        const std::string &filename = *itor;
1241        if (endWithCompact(filename)) {
1242            continue;
1243        }
1244
1245        size_t secondDot = filename.rfind(".");
1246        char *ptr = NULL;
1247        uint64_t revnum = strtoull(filename.substr(secondDot + 1).c_str(), &ptr, 10);
1248        if (newrev < revnum) {
1249            newrev = revnum;
1250            dbFileName = filename;
1251        }
1252    }
1253    return newrev;
1254}
1255
1256void CouchKVStore::updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev)
1257{
1258    if (vbucketId >= numDbFiles) {
1259        LOG(EXTENSION_LOG_WARNING, NULL,
1260            "Warning: cannot update db file map for an invalid vbucket, "
1261            "vbucket id = %d, rev = %lld\n", vbucketId, newFileRev);
1262        return;
1263    }
1264
1265    dbFileRevMap[vbucketId] = newFileRev;
1266}
1267
1268couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
1269                                        uint64_t fileRev,
1270                                        Db **db,
1271                                        uint64_t options,
1272                                        uint64_t *newFileRev)
1273{
1274    std::string dbFileName = getDBFileName(dbname, vbucketId, fileRev);
1275    couch_file_ops* ops = &statCollectingFileOps;
1276
1277    uint64_t newRevNum = fileRev;
1278    couchstore_error_t errorCode = COUCHSTORE_SUCCESS;
1279
1280    if (options == COUCHSTORE_OPEN_FLAG_CREATE) {
1281        // first try to open the requested file without the create option
1282        // in case it does already exist
1283        errorCode = couchstore_open_db_ex(dbFileName.c_str(), 0, ops, db);
1284        if (errorCode != COUCHSTORE_SUCCESS) {
1285            // open_db failed but still check if the file exists
1286            newRevNum = checkNewRevNum(dbFileName);
1287            bool fileExists = (newRevNum) ? true : false;
1288            if (fileExists) {
1289                errorCode = openDB_retry(dbFileName, 0, ops, db, &newRevNum);
1290            } else {
1291                // requested file doesn't seem to exist, just create one
1292                errorCode = couchstore_open_db_ex(dbFileName.c_str(), options,
1293                                                  ops, db);
1294                if (errorCode == COUCHSTORE_SUCCESS) {
1295                    newRevNum = 1;
1296                    updateDbFileMap(vbucketId, fileRev);
1297                    LOG(EXTENSION_LOG_INFO,
1298                        "INFO: created new couch db file, name=%s rev=%llu",
1299                        dbFileName.c_str(), fileRev);
1300                }
1301            }
1302        }
1303    } else {
1304        errorCode = openDB_retry(dbFileName, options, ops, db, &newRevNum);
1305    }
1306
1307    /* update command statistics */
1308    st.numOpen++;
1309    if (errorCode) {
1310        st.numOpenFailure++;
1311        LOG(EXTENSION_LOG_WARNING, "Warning: couchstore_open_db failed, name=%s"
1312            " option=%X rev=%llu error=%s [%s]\n", dbFileName.c_str(), options,
1313            ((newRevNum > fileRev) ? newRevNum : fileRev),
1314            couchstore_strerror(errorCode),
1315            getSystemStrerror().c_str());
1316    } else {
1317        if (newRevNum > fileRev) {
1318            // new revision number found, update it
1319            updateDbFileMap(vbucketId, newRevNum);
1320        }
1321    }
1322
1323    if (newFileRev != NULL) {
1324        *newFileRev = (newRevNum > fileRev) ? newRevNum : fileRev;
1325    }
1326    return errorCode;
1327}
1328
1329couchstore_error_t CouchKVStore::openDB_retry(std::string &dbfile,
1330                                              uint64_t options,
1331                                              const couch_file_ops *ops,
1332                                              Db** db, uint64_t *newFileRev)
1333{
1334    int retry = 0;
1335    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1336
1337    while (retry < MAX_OPEN_DB_RETRY) {
1338        errCode = couchstore_open_db_ex(dbfile.c_str(), options, ops, db);
1339        if (errCode == COUCHSTORE_SUCCESS) {
1340            return errCode;
1341        }
1342        LOG(EXTENSION_LOG_INFO, "INFO: couchstore_open_db failed, name=%s "
1343            "options=%X error=%s [%s], try it again!",
1344            dbfile.c_str(), options, couchstore_strerror(errCode),
1345            getSystemStrerror().c_str());
1346        *newFileRev = checkNewRevNum(dbfile);
1347        ++retry;
1348        if (retry == MAX_OPEN_DB_RETRY - 1 && options == 0 &&
1349            errCode == COUCHSTORE_ERROR_NO_SUCH_FILE) {
1350            options = COUCHSTORE_OPEN_FLAG_CREATE;
1351        }
1352    }
1353    return errCode;
1354}
1355
1356void CouchKVStore::populateFileNameMap(std::vector<std::string> &filenames)
1357{
1358    std::vector<std::string>::iterator fileItr;
1359
1360    for (fileItr = filenames.begin(); fileItr != filenames.end(); ++fileItr) {
1361        const std::string &filename = *fileItr;
1362        size_t secondDot = filename.rfind(".");
1363        std::string nameKey = filename.substr(0, secondDot);
1364        size_t firstDot = nameKey.rfind(".");
1365        size_t firstSlash = nameKey.rfind("/");
1366
1367        std::string revNumStr = filename.substr(secondDot + 1);
1368        char *ptr = NULL;
1369        uint64_t revNum = strtoull(revNumStr.c_str(), &ptr, 10);
1370
1371        std::string vbIdStr = nameKey.substr(firstSlash + 1,
1372                                            (firstDot - firstSlash) - 1);
1373        if (allDigit(vbIdStr)) {
1374            int vbId = atoi(vbIdStr.c_str());
1375            uint64_t old_rev_num = dbFileRevMap[vbId];
1376            if (old_rev_num == revNum) {
1377                continue;
1378            } else if (old_rev_num < revNum) { // stale revision found
1379                dbFileRevMap[vbId] = revNum;
1380            } else { // stale file found (revision id has rolled over)
1381                old_rev_num = revNum;
1382            }
1383            std::stringstream old_file;
1384            old_file << dbname << "/" << vbId << ".couch." << old_rev_num;
1385            if (access(old_file.str().c_str(), F_OK) == 0) {
1386                if (remove(old_file.str().c_str()) != 0) {
1387                    LOG(EXTENSION_LOG_WARNING,
1388                        "Warning: Failed to remove the stale file '%s': %s",
1389                        old_file.str().c_str(), getSystemStrerror().c_str());
1390                } else {
1391                    LOG(EXTENSION_LOG_WARNING,
1392                        "Warning: Removed stale file '%s'",
1393                        old_file.str().c_str());
1394                }
1395            }
1396        } else {
1397            // skip non-vbucket database file, master.couch etc
1398            LOG(EXTENSION_LOG_DEBUG,
1399                "Non-vbucket database file, %s, skip adding "
1400                "to CouchKVStore dbFileMap\n", filename.c_str());
1401        }
1402    }
1403    dbFileRevMapPopulated = true;
1404}
1405
1406couchstore_error_t CouchKVStore::fetchDoc(Db *db, DocInfo *docinfo,
1407                                          GetValue &docValue, uint16_t vbId,
1408                                          bool metaOnly, bool fetchDelete)
1409{
1410    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1411    sized_buf metadata = docinfo->rev_meta;
1412    uint32_t itemFlags;
1413    uint64_t cas;
1414    time_t exptime;
1415    uint8_t *ext_meta = NULL;
1416    uint8_t ext_len;
1417
1418    cb_assert(metadata.size >= DEFAULT_META_LEN);
1419    if (metadata.size == DEFAULT_META_LEN) {
1420        memcpy(&cas, (metadata.buf), 8);
1421        memcpy(&exptime, (metadata.buf) + 8, 4);
1422        memcpy(&itemFlags, (metadata.buf) + 12, 4);
1423        ext_len = 0;
1424    } else {
1425        //metadata.size => 18, FLEX_META_CODE at offset 16
1426        memcpy(&cas, (metadata.buf), 8);
1427        memcpy(&exptime, (metadata.buf) + 8, 4);
1428        memcpy(&itemFlags, (metadata.buf) + 12, 4);
1429        ext_len = metadata.size - DEFAULT_META_LEN - FLEX_DATA_OFFSET;
1430        ext_meta = (uint8_t *)malloc(sizeof(uint8_t) * ext_len);
1431        memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1432               ext_len);
1433    }
1434    cas = ntohll(cas);
1435    exptime = ntohl(exptime);
1436
1437    if (metaOnly || (fetchDelete && docinfo->deleted)) {
1438        Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1439                            docinfo->size, itemFlags, (time_t)exptime,
1440                            ext_meta, ext_len, cas, docinfo->db_seq, vbId);
1441        if (docinfo->deleted) {
1442            it->setDeleted();
1443        }
1444        it->setRevSeqno(docinfo->rev_seq);
1445        docValue = GetValue(it);
1446        // update ep-engine IO stats
1447        ++epStats.io_num_read;
1448        epStats.io_read_bytes.fetch_add(docinfo->id.size);
1449    } else {
1450        Doc *doc = NULL;
1451        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1452                                                   DECOMPRESS_DOC_BODIES);
1453        if (errCode == COUCHSTORE_SUCCESS) {
1454            if (docinfo->deleted) {
1455                // do not read a doc that is marked deleted, just return the
1456                // error code as not found but still release the document body.
1457                errCode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1458            } else {
1459                cb_assert(doc && (doc->id.size <= UINT16_MAX));
1460                size_t valuelen = doc->data.size;
1461                void *valuePtr = doc->data.buf;
1462                Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1463                                    itemFlags, (time_t)exptime, valuePtr, valuelen,
1464                                    ext_meta, ext_len, cas, docinfo->db_seq, vbId,
1465                                    docinfo->rev_seq);
1466                docValue = GetValue(it);
1467
1468                // update ep-engine IO stats
1469                ++epStats.io_num_read;
1470                epStats.io_read_bytes.fetch_add(docinfo->id.size + valuelen);
1471            }
1472            couchstore_free_document(doc);
1473        }
1474    }
1475    free(ext_meta);
1476    return errCode;
1477}
1478
1479int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx)
1480{
1481    LoadResponseCtx *loadCtx = (LoadResponseCtx *)ctx;
1482    shared_ptr<Callback<GetValue> > cb = loadCtx->callback;
1483    shared_ptr<Callback<CacheLookup> > cl = loadCtx->lookup;
1484
1485    Doc *doc = NULL;
1486    void *valuePtr = NULL;
1487    size_t valuelen = 0;
1488    uint64_t byseqno = docinfo->db_seq;
1489    sized_buf  metadata = docinfo->rev_meta;
1490    uint16_t vbucketId = loadCtx->vbucketId;
1491    sized_buf key = docinfo->id;
1492    uint32_t itemflags;
1493    uint64_t cas;
1494    uint32_t exptime;
1495    uint8_t *ext_meta = NULL;
1496    uint8_t ext_len;
1497
1498    cb_assert(key.size <= UINT16_MAX);
1499    cb_assert(metadata.size >= DEFAULT_META_LEN);
1500
1501    std::string docKey(docinfo->id.buf, docinfo->id.size);
1502    CacheLookup lookup(docKey, byseqno, vbucketId);
1503    cl->callback(lookup);
1504    if (cl->getStatus() == ENGINE_KEY_EEXISTS) {
1505        return COUCHSTORE_SUCCESS;
1506    }
1507
1508    if (metadata.size == DEFAULT_META_LEN) {
1509        memcpy(&cas, (metadata.buf), 8);
1510        memcpy(&exptime, (metadata.buf) + 8, 4);
1511        memcpy(&itemflags, (metadata.buf) + 12, 4);
1512        ext_len = 0;
1513    } else {
1514        //metadata.size > 16, FLEX_META_CODE at offset 16
1515        memcpy(&cas, (metadata.buf), 8);
1516        memcpy(&exptime, (metadata.buf) + 8, 4);
1517        memcpy(&itemflags, (metadata.buf) + 12, 4);
1518        ext_len = metadata.size - DEFAULT_META_LEN - FLEX_DATA_OFFSET;
1519        ext_meta = (uint8_t *)malloc(sizeof(uint8_t) * ext_len);
1520        memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1521               ext_len);
1522    }
1523    exptime = ntohl(exptime);
1524    cas = ntohll(cas);
1525
1526    if (!loadCtx->keysonly && !docinfo->deleted) {
1527        couchstore_error_t errCode ;
1528        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1529                                                   DECOMPRESS_DOC_BODIES);
1530
1531        if (errCode == COUCHSTORE_SUCCESS) {
1532            if (doc->data.size) {
1533                valuelen = doc->data.size;
1534                valuePtr = doc->data.buf;
1535            }
1536        } else {
1537            LOG(EXTENSION_LOG_WARNING,
1538                "Warning: failed to retrieve key value from database "
1539                "database, vBucket=%d key=%s error=%s [%s]\n",
1540                vbucketId, key.buf, couchstore_strerror(errCode),
1541                couchkvstore_strerrno(db, errCode).c_str());
1542            free(ext_meta);
1543            return COUCHSTORE_SUCCESS;
1544        }
1545    }
1546
1547    Item *it = new Item((void *)key.buf,
1548                        key.size,
1549                        itemflags,
1550                        (time_t)exptime,
1551                        valuePtr, valuelen,
1552                        ext_meta, ext_len,
1553                        cas,
1554                        docinfo->db_seq, // return seq number being persisted on disk
1555                        vbucketId,
1556                        docinfo->rev_seq);
1557    if (docinfo->deleted) {
1558        it->setDeleted();
1559    }
1560
1561
1562    GetValue rv(it, ENGINE_SUCCESS, -1, loadCtx->keysonly);
1563    cb->callback(rv);
1564
1565    couchstore_free_document(doc);
1566
1567    free(ext_meta);
1568
1569    if (cb->getStatus() == ENGINE_ENOMEM) {
1570        return COUCHSTORE_ERROR_CANCEL;
1571    }
1572    return COUCHSTORE_SUCCESS;
1573}
1574
1575bool CouchKVStore::commit2couchstore(Callback<kvstats_ctx> *cb)
1576{
1577    bool success = true;
1578
1579    size_t pendingCommitCnt = pendingReqsQ.size();
1580    if (pendingCommitCnt == 0) {
1581        return success;
1582    }
1583
1584    Doc **docs = new Doc *[pendingCommitCnt];
1585    DocInfo **docinfos = new DocInfo *[pendingCommitCnt];
1586
1587    cb_assert(pendingReqsQ[0]);
1588    uint16_t vbucket2flush = pendingReqsQ[0]->getVBucketId();
1589    uint64_t fileRev = pendingReqsQ[0]->getRevNum();
1590    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1591        CouchRequest *req = pendingReqsQ[i];
1592        cb_assert(req);
1593        docs[i] = req->getDbDoc();
1594        docinfos[i] = req->getDbDocInfo();
1595        cb_assert(vbucket2flush == req->getVBucketId());
1596    }
1597
1598    kvstats_ctx kvctx;
1599    kvctx.vbucket = vbucket2flush;
1600    // flush all
1601    couchstore_error_t errCode = saveDocs(vbucket2flush, fileRev, docs,
1602                                          docinfos, pendingCommitCnt, kvctx);
1603    if (errCode) {
1604        LOG(EXTENSION_LOG_WARNING,
1605            "Warning: commit failed, cannot save CouchDB docs "
1606            "for vbucket = %d rev = %llu\n", vbucket2flush, fileRev);
1607        ++epStats.commitFailed;
1608    }
1609    if (cb) {
1610        cb->callback(kvctx);
1611    }
1612    commitCallback(pendingReqsQ, kvctx, errCode);
1613
1614    // clean up
1615    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1616        delete pendingReqsQ[i];
1617    }
1618    pendingReqsQ.clear();
1619    delete [] docs;
1620    delete [] docinfos;
1621    return success;
1622}
1623
1624static int readDocInfos(Db *db, DocInfo *docinfo, void *ctx)
1625{
1626    cb_assert(ctx);
1627    kvstats_ctx *cbCtx = static_cast<kvstats_ctx *>(ctx);
1628    if(docinfo) {
1629        // An item exists in the VB DB file.
1630        if (!docinfo->deleted) {
1631            std::string key(docinfo->id.buf, docinfo->id.size);
1632            unordered_map<std::string, kstat_entry_t>::iterator itr =
1633                cbCtx->keyStats.find(key);
1634            if (itr != cbCtx->keyStats.end()) {
1635                itr->second.first = true;
1636            }
1637        }
1638    }
1639    return 0;
1640}
1641
1642couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev, Doc **docs,
1643                                          DocInfo **docinfos, size_t docCount,
1644                                          kvstats_ctx &kvctx)
1645{
1646    couchstore_error_t errCode;
1647    uint64_t fileRev = rev;
1648    DbInfo info;
1649    cb_assert(fileRev);
1650
1651    Db *db = NULL;
1652    uint64_t newFileRev;
1653    errCode = openDB(vbid, fileRev, &db, 0, &newFileRev);
1654    if (errCode != COUCHSTORE_SUCCESS) {
1655        LOG(EXTENSION_LOG_WARNING,
1656                "Warning: failed to open database, vbucketId = %d "
1657                "fileRev = %llu numDocs = %d", vbid, fileRev, docCount);
1658        return errCode;
1659    } else {
1660        uint64_t max = computeMaxDeletedSeqNum(docinfos, docCount);
1661
1662        // update max_deleted_seq in the local doc (vbstate)
1663        // before save docs for the given vBucket
1664        if (max > 0) {
1665            vbucket_map_t::iterator it =
1666                cachedVBStates.find(vbid);
1667            if (it != cachedVBStates.end() && it->second.maxDeletedSeqno < max) {
1668                it->second.maxDeletedSeqno = max;
1669                errCode = saveVBState(db, it->second);
1670                if (errCode != COUCHSTORE_SUCCESS) {
1671                    LOG(EXTENSION_LOG_WARNING,
1672                            "Warning: failed to save local doc for, "
1673                            "vBucket = %d numDocs = %d\n", vbid, docCount);
1674                    closeDatabaseHandle(db);
1675                    return errCode;
1676                }
1677            }
1678        }
1679
1680        sized_buf *ids = new sized_buf[docCount];
1681        for (size_t idx = 0; idx < docCount; idx++) {
1682            ids[idx] = docinfos[idx]->id;
1683            std::string key(ids[idx].buf, ids[idx].size);
1684            kvctx.keyStats[key] = std::make_pair(false,
1685                    !docinfos[idx]->deleted);
1686        }
1687        couchstore_docinfos_by_id(db, ids, (unsigned) docCount, 0,
1688                readDocInfos, &kvctx);
1689        delete[] ids;
1690
1691        hrtime_t cs_begin = gethrtime();
1692        uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
1693        errCode = couchstore_save_documents(db, docs, docinfos,
1694                (unsigned) docCount, flags);
1695        st.saveDocsHisto.add((gethrtime() - cs_begin) / 1000);
1696        if (errCode != COUCHSTORE_SUCCESS) {
1697            LOG(EXTENSION_LOG_WARNING,
1698                    "Warning: failed to save docs to database, numDocs = %d "
1699                    "error=%s [%s]\n", docCount, couchstore_strerror(errCode),
1700                    couchkvstore_strerrno(db, errCode).c_str());
1701            closeDatabaseHandle(db);
1702            return errCode;
1703        }
1704
1705        cs_begin = gethrtime();
1706        errCode = couchstore_commit(db);
1707        st.commitHisto.add((gethrtime() - cs_begin) / 1000);
1708        if (errCode) {
1709            LOG(EXTENSION_LOG_WARNING,
1710                    "Warning: couchstore_commit failed, error=%s [%s]",
1711                    couchstore_strerror(errCode),
1712                    couchkvstore_strerrno(db, errCode).c_str());
1713            closeDatabaseHandle(db);
1714            return errCode;
1715        }
1716
1717        if (epStats.isShutdown) {
1718            // shutdown is in progress, no need to notify mccouch
1719            // the compactor must have already exited!
1720            closeDatabaseHandle(db);
1721            return errCode;
1722        }
1723
1724        RememberingCallback<uint16_t> cb;
1725        uint64_t newHeaderPos = couchstore_get_header_position(db);
1726        couchNotifier->notify_headerpos_update(vbid, newFileRev, newHeaderPos, cb);
1727        if (cb.val != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1728            cb_assert(cb.val != PROTOCOL_BINARY_RESPONSE_ETMPFAIL);
1729            LOG(EXTENSION_LOG_WARNING, "Warning: failed to notify "
1730                    "CouchDB of update for vbucket=%d, error=0x%x\n",
1731                    vbid, cb.val);
1732        }
1733        st.batchSize.add(docCount);
1734
1735        // retrieve storage system stats for file fragmentation computation
1736        couchstore_db_info(db, &info);
1737        kvctx.fileSpaceUsed = info.space_used;
1738        kvctx.fileSize = info.file_size;
1739        cachedDeleteCount[vbid] = info.deleted_count;
1740        cachedDocCount[vbid] = info.doc_count;
1741        closeDatabaseHandle(db);
1742    }
1743
1744    /* update stat */
1745    if(errCode == COUCHSTORE_SUCCESS) {
1746        st.docsCommitted = docCount;
1747    }
1748
1749    return errCode;
1750}
1751
1752void CouchKVStore::remVBucketFromDbFileMap(uint16_t vbucketId)
1753{
1754    if (vbucketId >= numDbFiles) {
1755        LOG(EXTENSION_LOG_WARNING, NULL,
1756            "Warning: cannot remove db file map entry for an invalid vbucket, "
1757            "vbucket id = %d\n", vbucketId);
1758        return;
1759    }
1760
1761    // just reset revision number of the requested vbucket
1762    dbFileRevMap[vbucketId] = 1;
1763}
1764
1765void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
1766                                  kvstats_ctx &kvctx,
1767                                  couchstore_error_t errCode)
1768{
1769    size_t commitSize = committedReqs.size();
1770
1771    for (size_t index = 0; index < commitSize; index++) {
1772        size_t dataSize = committedReqs[index]->getNBytes();
1773        size_t keySize = committedReqs[index]->getKey().length();
1774        /* update ep stats */
1775        ++epStats.io_num_write;
1776        epStats.io_write_bytes.fetch_add(keySize + dataSize);
1777
1778        if (committedReqs[index]->isDelete()) {
1779            int rv = getMutationStatus(errCode);
1780            if (rv != -1) {
1781                const std::string &key = committedReqs[index]->getKey();
1782                if (kvctx.keyStats[key].first) {
1783                    rv = 1; // Deletion is for an existing item on DB file.
1784                } else {
1785                    rv = 0; // Deletion is for a non-existing item on DB file.
1786                }
1787            }
1788            if (errCode) {
1789                ++st.numDelFailure;
1790            } else {
1791                st.delTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1792            }
1793            committedReqs[index]->getDelCallback()->callback(rv);
1794        } else {
1795            int rv = getMutationStatus(errCode);
1796            const std::string &key = committedReqs[index]->getKey();
1797            bool insertion = !kvctx.keyStats[key].first;
1798            if (errCode) {
1799                ++st.numSetFailure;
1800            } else {
1801                st.writeTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1802                st.writeSizeHisto.add(dataSize + keySize);
1803            }
1804            mutation_result p(rv, insertion);
1805            committedReqs[index]->getSetCallback()->callback(p);
1806        }
1807    }
1808}
1809
1810void CouchKVStore::readVBState(Db *db, uint16_t vbId, vbucket_state &vbState)
1811{
1812    sized_buf id;
1813    LocalDoc *ldoc = NULL;
1814    couchstore_error_t errCode;
1815
1816    vbState.state = vbucket_state_dead;
1817    vbState.checkpointId = 0;
1818    vbState.maxDeletedSeqno = 0;
1819
1820    id.buf = (char *)"_local/vbstate";
1821    id.size = sizeof("_local/vbstate") - 1;
1822    errCode = couchstore_open_local_document(db, (void *)id.buf,
1823                                             id.size, &ldoc);
1824    if (errCode != COUCHSTORE_SUCCESS) {
1825        LOG(EXTENSION_LOG_DEBUG,
1826            "Warning: failed to retrieve stat info for vBucket=%d error=%s [%s]",
1827            vbId, couchstore_strerror(errCode),
1828            couchkvstore_strerrno(db, errCode).c_str());
1829    } else {
1830        const std::string statjson(ldoc->json.buf, ldoc->json.size);
1831        cJSON *jsonObj = cJSON_Parse(statjson.c_str());
1832        if (!jsonObj) {
1833            LOG(EXTENSION_LOG_WARNING,
1834                "Warning: failed to parse the vbstat json doc for vbucket %d: %s",
1835                vbId, statjson.c_str());
1836            couchstore_free_local_document(ldoc);
1837            return;
1838        }
1839
1840        const std::string state = getJSONObjString(
1841                                cJSON_GetObjectItem(jsonObj, "state"));
1842        const std::string checkpoint_id = getJSONObjString(
1843                                cJSON_GetObjectItem(jsonObj,"checkpoint_id"));
1844        const std::string max_deleted_seqno = getJSONObjString(
1845                                cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
1846        cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
1847        if (state.compare("") == 0 || checkpoint_id.compare("") == 0
1848                || max_deleted_seqno.compare("") == 0) {
1849            LOG(EXTENSION_LOG_WARNING,
1850                "Warning: state JSON doc for vbucket %d is in the wrong format: %s",
1851                vbId, statjson.c_str());
1852        } else {
1853            vbState.state = VBucket::fromString(state.c_str());
1854            parseUint64(max_deleted_seqno.c_str(), &vbState.maxDeletedSeqno);
1855            parseUint64(checkpoint_id.c_str(), &vbState.checkpointId);
1856
1857            if (failover_json) {
1858                char* json = cJSON_PrintUnformatted(failover_json);
1859                vbState.failovers.assign(json);
1860                free(json);
1861            }
1862        }
1863        cJSON_Delete(jsonObj);
1864        couchstore_free_local_document(ldoc);
1865
1866    }
1867
1868    DbInfo info;
1869    errCode = couchstore_db_info(db, &info);
1870    if (errCode == COUCHSTORE_SUCCESS) {
1871        vbState.highSeqno = info.last_sequence;
1872        vbState.purgeSeqno = info.purge_seq;
1873    } else {
1874        LOG(EXTENSION_LOG_WARNING,
1875            "Warning: failed to read database info for vBucket = %d", id);
1876        abort();
1877    }
1878}
1879
1880couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState)
1881{
1882    std::stringstream jsonState;
1883
1884    jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
1885              << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
1886              << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno << "\""
1887              << ",\"failover_table\": " << vbState.failovers
1888              << "}";
1889
1890    LocalDoc lDoc;
1891    lDoc.id.buf = (char *)"_local/vbstate";
1892    lDoc.id.size = sizeof("_local/vbstate") - 1;
1893    std::string state = jsonState.str();
1894    lDoc.json.buf = (char *)state.c_str();
1895    lDoc.json.size = state.size();
1896    lDoc.deleted = 0;
1897
1898    couchstore_error_t errCode = couchstore_save_local_document(db, &lDoc);
1899    if (errCode != COUCHSTORE_SUCCESS) {
1900        LOG(EXTENSION_LOG_WARNING,
1901            "Warning: couchstore_save_local_document failed "
1902            "error=%s [%s]\n", couchstore_strerror(errCode),
1903            couchkvstore_strerrno(db, errCode).c_str());
1904    }
1905    return errCode;
1906}
1907
1908int CouchKVStore::getMultiCb(Db *db, DocInfo *docinfo, void *ctx)
1909{
1910    cb_assert(docinfo);
1911    std::string keyStr(docinfo->id.buf, docinfo->id.size);
1912    cb_assert(ctx);
1913    GetMultiCbCtx *cbCtx = static_cast<GetMultiCbCtx *>(ctx);
1914    CouchKVStoreStats &st = cbCtx->cks.getCKVStoreStat();
1915
1916
1917    vb_bgfetch_queue_t::iterator qitr = cbCtx->fetches.find(keyStr);
1918    if (qitr == cbCtx->fetches.end()) {
1919        // this could be a serious race condition in couchstore,
1920        // log a warning message and continue
1921        LOG(EXTENSION_LOG_WARNING,
1922            "Warning: couchstore returned invalid docinfo, "
1923            "no pending bgfetch has been issued for key = %s\n",
1924            keyStr.c_str());
1925        return 0;
1926    }
1927
1928    bool meta_only = true;
1929    std::list<VBucketBGFetchItem *> &fetches = (*qitr).second;
1930    std::list<VBucketBGFetchItem *>::iterator itr = fetches.begin();
1931    for (; itr != fetches.end(); ++itr) {
1932        if (!((*itr)->metaDataOnly)) {
1933            meta_only = false;
1934            break;
1935        }
1936    }
1937
1938    GetValue returnVal;
1939    couchstore_error_t errCode = cbCtx->cks.fetchDoc(db, docinfo, returnVal,
1940                                                     cbCtx->vbId, meta_only);
1941    if (errCode != COUCHSTORE_SUCCESS) {
1942        LOG(EXTENSION_LOG_WARNING, "Warning: failed to fetch data from database, "
1943            "vBucket=%d key=%s error=%s [%s]", cbCtx->vbId,
1944            keyStr.c_str(), couchstore_strerror(errCode),
1945            couchkvstore_strerrno(db, errCode).c_str());
1946        st.numGetFailure++;
1947    }
1948
1949    returnVal.setStatus(cbCtx->cks.couchErr2EngineErr(errCode));
1950    for (itr = fetches.begin(); itr != fetches.end(); ++itr) {
1951        // populate return value for remaining fetch items with the
1952        // same seqid
1953        (*itr)->value = returnVal;
1954        st.readTimeHisto.add((gethrtime() - (*itr)->initTime) / 1000);
1955        if (errCode == COUCHSTORE_SUCCESS) {
1956            st.readSizeHisto.add(returnVal.getValue()->getNKey() +
1957                                 returnVal.getValue()->getNBytes());
1958        }
1959    }
1960    return 0;
1961}
1962
1963
1964void CouchKVStore::closeDatabaseHandle(Db *db) {
1965    couchstore_error_t ret = couchstore_close_db(db);
1966    if (ret != COUCHSTORE_SUCCESS) {
1967        LOG(EXTENSION_LOG_WARNING,
1968            "Warning: couchstore_close_db failed, error=%s [%s]",
1969            couchstore_strerror(ret), couchkvstore_strerrno(NULL, ret).c_str());
1970    }
1971    st.numClose++;
1972}
1973
1974ENGINE_ERROR_CODE CouchKVStore::couchErr2EngineErr(couchstore_error_t errCode)
1975{
1976    switch (errCode) {
1977    case COUCHSTORE_SUCCESS:
1978        return ENGINE_SUCCESS;
1979    case COUCHSTORE_ERROR_ALLOC_FAIL:
1980        return ENGINE_ENOMEM;
1981    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
1982        return ENGINE_KEY_ENOENT;
1983    case COUCHSTORE_ERROR_NO_SUCH_FILE:
1984    case COUCHSTORE_ERROR_NO_HEADER:
1985    default:
1986        // same as the general error return code of
1987        // EvetuallyPersistentStore::getInternal
1988        return ENGINE_TMPFAIL;
1989    }
1990}
1991
1992size_t CouchKVStore::getEstimatedItemCount(std::vector<uint16_t> &vbs)
1993{
1994    size_t items = 0;
1995    std::vector<uint16_t>::iterator it;
1996    for (it = vbs.begin(); it != vbs.end(); ++it) {
1997        items += getNumItems(*it);
1998    }
1999    return items;
2000}
2001
2002size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
2003    std::map<uint16_t, size_t>::iterator itr = cachedDeleteCount.find(vbid);
2004    if (itr != cachedDeleteCount.end()) {
2005        return itr->second;
2006    }
2007
2008    if (!dbFileRevMapPopulated) {
2009        std::vector<std::string> files;
2010        // first scan all db files from data directory
2011        discoverDbFiles(dbname, files);
2012        populateFileNameMap(files);
2013    }
2014
2015    Db *db = NULL;
2016    uint64_t rev = dbFileRevMap[vbid];
2017    couchstore_error_t errCode = openDB(vbid, rev, &db,
2018                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2019    if (errCode == COUCHSTORE_SUCCESS) {
2020        DbInfo info;
2021        errCode = couchstore_db_info(db, &info);
2022        if (errCode == COUCHSTORE_SUCCESS) {
2023            cachedDeleteCount[vbid] = info.deleted_count;
2024            closeDatabaseHandle(db);
2025            return info.deleted_count;
2026        } else {
2027            LOG(EXTENSION_LOG_WARNING,
2028                "Warning: failed to read database info for "
2029                "vBucket = %d rev = %llu\n", vbid, rev);
2030        }
2031        closeDatabaseHandle(db);
2032    } else {
2033        LOG(EXTENSION_LOG_WARNING,
2034            "Warning: failed to open database file for "
2035            "vBucket = %d rev = %llu\n", vbid, rev);
2036    }
2037    return 0;
2038
2039}
2040
2041size_t CouchKVStore::getNumItems(uint16_t vbid) {
2042    unordered_map<uint16_t, size_t>::iterator itr = cachedDocCount.find(vbid);
2043    if (itr != cachedDocCount.end()) {
2044        return itr->second;
2045    }
2046
2047    if (!dbFileRevMapPopulated) {
2048        std::vector<std::string> files;
2049        // first scan all db files from data directory
2050        discoverDbFiles(dbname, files);
2051        populateFileNameMap(files);
2052    }
2053
2054    Db *db = NULL;
2055    uint64_t rev = dbFileRevMap[vbid];
2056    couchstore_error_t errCode = openDB(vbid, rev, &db,
2057                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2058    if (errCode == COUCHSTORE_SUCCESS) {
2059        DbInfo info;
2060        errCode = couchstore_db_info(db, &info);
2061        if (errCode == COUCHSTORE_SUCCESS) {
2062            cachedDocCount[vbid] = info.doc_count;
2063            closeDatabaseHandle(db);
2064            return info.doc_count;
2065        } else {
2066            LOG(EXTENSION_LOG_WARNING,
2067                "Warning: failed to read database info for "
2068                "vBucket = %d rev = %llu\n", vbid, rev);
2069        }
2070        closeDatabaseHandle(db);
2071    } else {
2072        LOG(EXTENSION_LOG_WARNING,
2073            "Warning: failed to open database file for "
2074            "vBucket = %d rev = %llu\n", vbid, rev);
2075    }
2076    return 0;
2077}
2078
2079size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
2080                                 uint64_t max_seq) {
2081    if (!dbFileRevMapPopulated) {
2082        std::vector<std::string> files;
2083        discoverDbFiles(dbname, files);
2084        populateFileNameMap(files);
2085    }
2086
2087    Db *db = NULL;
2088    uint64_t count = 0;
2089    uint64_t rev = dbFileRevMap[vbid];
2090    couchstore_error_t errCode = openDB(vbid, rev, &db,
2091                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2092    if (errCode == COUCHSTORE_SUCCESS) {
2093        errCode = couchstore_changes_count(db, min_seq, max_seq, &count);
2094        if (errCode != COUCHSTORE_SUCCESS) {
2095            LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2096                "vBucket = %d rev = %llu", vbid, rev);
2097        }
2098        closeDatabaseHandle(db);
2099    } else {
2100        LOG(EXTENSION_LOG_WARNING, "Failed to open database file for vBucket"
2101            " = %d rev = %llu", vbid, rev);
2102    }
2103    return count;
2104}
2105
2106rollback_error_code
2107CouchKVStore::rollback(uint16_t vbid,
2108                       uint64_t rollbackSeqno,
2109                       shared_ptr<RollbackCB> cb) {
2110
2111    Db *db = NULL;
2112    DbInfo info;
2113    uint64_t fileRev = dbFileRevMap[vbid];
2114    std::stringstream dbFileName;
2115    dbFileName << dbname << "/" << vbid << ".couch." << fileRev;
2116    couchstore_error_t errCode;
2117    rollback_error_code err;
2118
2119    errCode = openDB(vbid, fileRev, &db,
2120                     (uint64_t) COUCHSTORE_OPEN_FLAG_RDONLY);
2121
2122    if (errCode == COUCHSTORE_SUCCESS) {
2123        errCode = couchstore_db_info(db, &info);
2124        if (errCode != COUCHSTORE_SUCCESS) {
2125            LOG(EXTENSION_LOG_WARNING,
2126                "Failed to read DB info, name=%s",
2127                dbFileName.str().c_str());
2128            closeDatabaseHandle(db);
2129            err.first = ENGINE_ROLLBACK;
2130            err.second = 0;
2131            return err;
2132        }
2133    } else {
2134        LOG(EXTENSION_LOG_WARNING,
2135                "Failed to open database, name=%s",
2136                dbFileName.str().c_str());
2137        err.first = ENGINE_ROLLBACK;
2138        err.second = 0;
2139        return err;
2140    }
2141
2142    uint64_t latestSeqno = info.last_sequence;
2143
2144    //Count from latest seq no to 0
2145    uint64_t totSeqCount = 0;
2146    errCode = couchstore_changes_count(db, 0, latestSeqno, &totSeqCount);
2147    if (errCode != COUCHSTORE_SUCCESS) {
2148        LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2149            "rollback vBucket = %d, rev = %llu", vbid, fileRev);
2150        closeDatabaseHandle(db);
2151        err.first = ENGINE_ROLLBACK;
2152        err.second = 0;
2153        return err;
2154    }
2155
2156    Db *newdb = NULL;
2157    errCode = openDB(vbid, fileRev, &newdb, 0);
2158    if (errCode != COUCHSTORE_SUCCESS) {
2159        LOG(EXTENSION_LOG_WARNING,
2160                "Failed to open database, name=%s",
2161                dbFileName.str().c_str());
2162        closeDatabaseHandle(db);
2163        err.first = ENGINE_ROLLBACK;
2164        err.second = 0;
2165        return err;
2166    }
2167
2168    while (info.last_sequence > rollbackSeqno) {
2169        errCode = couchstore_rewind_db_header(newdb);
2170        if (errCode != COUCHSTORE_SUCCESS) {
2171            LOG(EXTENSION_LOG_WARNING,
2172                    "Failed to rewind Db pointer "
2173                    "for couch file with vbid: %u, whose "
2174                    "lastSeqno: %llu, while trying to roll back "
2175                    "to seqNo: %llu", vbid, latestSeqno, rollbackSeqno);
2176            //Reset the vbucket and send the entire snapshot,
2177            //as a previous header wasn't found.
2178            closeDatabaseHandle(db);
2179            closeDatabaseHandle(newdb);
2180            err.first = ENGINE_ROLLBACK;
2181            err.second = 0;
2182            return err;
2183        }
2184        errCode = couchstore_db_info(newdb, &info);
2185        if (errCode != COUCHSTORE_SUCCESS) {
2186            LOG(EXTENSION_LOG_WARNING,
2187                "Failed to read DB info, name=%s",
2188                dbFileName.str().c_str());
2189            closeDatabaseHandle(db);
2190            closeDatabaseHandle(newdb);
2191            err.first = ENGINE_ROLLBACK;
2192            err.second = 0;
2193            return err;
2194        }
2195    }
2196
2197    //Count from latest seq no to rollback seq no
2198    uint64_t rollbackSeqCount = 0;
2199    errCode = couchstore_changes_count(db, info.last_sequence, latestSeqno,
2200                                       &rollbackSeqCount);
2201    if (errCode != COUCHSTORE_SUCCESS) {
2202        LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2203            "rollback vBucket = %d, rev = %llu", vbid, fileRev);
2204        closeDatabaseHandle(db);
2205        closeDatabaseHandle(newdb);
2206        err.first = ENGINE_ROLLBACK;
2207        err.second = 0;
2208        return err;
2209    }
2210
2211    if ((totSeqCount / 2) <= rollbackSeqCount) {
2212        //doresetVbucket flag set or rollback is greater than 50%,
2213        //reset the vbucket and send the entire snapshot
2214        closeDatabaseHandle(db);
2215        closeDatabaseHandle(newdb);
2216        err.first = ENGINE_ROLLBACK;
2217        err.second = 0;
2218        return err;
2219    }
2220
2221    cb->setDbHeader(newdb);
2222    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
2223    LoadResponseCtx ctx;
2224    ctx.vbucketId = vbid;
2225    ctx.keysonly = true;
2226    ctx.lookup = cl;
2227    ctx.callback = cb;
2228    ctx.stats = &epStats;
2229    errCode = couchstore_changes_since(db, info.last_sequence + 1,
2230                                       COUCHSTORE_NO_OPTIONS,
2231                                       recordDbDumpC,
2232                                       static_cast<void *>(&ctx));
2233    if (errCode != COUCHSTORE_SUCCESS) {
2234        if (errCode == COUCHSTORE_ERROR_CANCEL) {
2235            LOG(EXTENSION_LOG_WARNING,
2236                "Canceling loading database\n");
2237        } else {
2238            LOG(EXTENSION_LOG_WARNING,
2239                "Couchstore_changes_since failed, error=%s [%s]",
2240                couchstore_strerror(errCode),
2241                couchkvstore_strerrno(db, errCode).c_str());
2242        }
2243        closeDatabaseHandle(db);
2244        closeDatabaseHandle(newdb);
2245        err.first = ENGINE_ROLLBACK;
2246        err.second = 0;
2247        return err;
2248    }
2249
2250    closeDatabaseHandle(db);
2251    //Append the rewinded header to the database file, before closing handle
2252    errCode = couchstore_commit(newdb);
2253    closeDatabaseHandle(newdb);
2254
2255    if (errCode != COUCHSTORE_SUCCESS) {
2256        err.first = ENGINE_ROLLBACK;
2257        err.second = 0;
2258        return err;
2259    }
2260
2261    err.first = ENGINE_SUCCESS;
2262    err.second = info.last_sequence;
2263    return err;
2264}
2265
2266int populateAllKeys(Db *db, DocInfo *docinfo, void *ctx) {
2267    AllKeysCtx *allKeysCtx = (AllKeysCtx *)ctx;
2268    uint16_t keylen = docinfo->id.size;
2269    char *key = docinfo->id.buf;
2270    (allKeysCtx->cb)->addtoAllKeys(keylen, key);
2271    if (--(allKeysCtx->count) <= 0) {
2272        //Only when count met is less than the actual number of entries
2273        return COUCHSTORE_ERROR_CANCEL;
2274    }
2275    return COUCHSTORE_SUCCESS;
2276}
2277
2278ENGINE_ERROR_CODE CouchKVStore::getAllKeys(uint16_t vbid,
2279                                           std::string &start_key,
2280                                           uint32_t count,
2281                                           AllKeysCB *cb) {
2282    Db *db = NULL;
2283    uint64_t rev = dbFileRevMap[vbid];
2284    couchstore_error_t errCode = openDB(vbid, rev, &db,
2285                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2286    if(errCode == COUCHSTORE_SUCCESS) {
2287        sized_buf ref = {NULL, 0};
2288        ref.buf = (char*) start_key.c_str();
2289        ref.size = start_key.size();
2290        AllKeysCtx ctx(cb, count);
2291        errCode = couchstore_all_docs(db, &ref, COUCHSTORE_NO_OPTIONS,
2292                                      populateAllKeys,
2293                                      static_cast<void *>(&ctx));
2294        closeDatabaseHandle(db);
2295        if (errCode == COUCHSTORE_SUCCESS ||
2296                errCode == COUCHSTORE_ERROR_CANCEL)  {
2297            return ENGINE_SUCCESS;
2298        } else {
2299            LOG(EXTENSION_LOG_WARNING, "couchstore_all_docs failed for "
2300                    "database file of vbucket = %d rev = %llu, errCode = %u\n",
2301                    vbid, rev, errCode);
2302        }
2303    } else {
2304        LOG(EXTENSION_LOG_WARNING, "Failed to open database file for "
2305                "vbucket = %d rev = %llu, errCode = %u\n", vbid, rev, errCode);
2306
2307    }
2308    return ENGINE_FAILED;
2309}
2310
2311void CouchKVStore::removeCompactFile(const std::string &dbname,
2312                                     uint16_t vbid,
2313                                     uint64_t fileRev)
2314{
2315    std::string dbfile = getDBFileName(dbname, vbid, fileRev);
2316    std::string compact_file = dbfile + ".compact";
2317    removeCompactFile(compact_file);
2318}
2319
2320void CouchKVStore::removeCompactFile(const std::string &filename)
2321{
2322    if (access(filename.c_str(), F_OK) == 0) {
2323        if (remove(filename.c_str()) == 0) {
2324            LOG(EXTENSION_LOG_WARNING,
2325                "Warning: Removed compact file '%s'", filename.c_str());
2326        }
2327        else {
2328            LOG(EXTENSION_LOG_WARNING,
2329                "Warning: Failed to remove compact file '%s': %s",
2330                filename.c_str(), getSystemStrerror().c_str());
2331        }
2332    }
2333}
2334
2335
2336/* end of couch-kvstore.cc */
2337