1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2012 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#ifdef _MSC_VER
21#include <direct.h>
22#define mkdir(a, b) _mkdir(a)
23#endif
24
25#include <fcntl.h>
26#include <stdio.h>
27#include <string.h>
28#include <sys/stat.h>
29#include <sys/types.h>
30
31#include <algorithm>
32#include <cctype>
33#include <cstdlib>
34#include <fstream>
35#include <iostream>
36#include <list>
37#include <map>
38#include <string>
39#include <utility>
40#include <vector>
41#include <platform/dirutils.h>
42#include <cJSON.h>
43
44#include "common.h"
45#include "couch-kvstore/couch-kvstore.h"
46#define STATWRITER_NAMESPACE couchstore_engine
47#include "statwriter.h"
48#undef STATWRITER_NAMESPACE
49
50using namespace CouchbaseDirectoryUtilities;
51
52static const int MUTATION_FAILED = -1;
53static const int DOC_NOT_FOUND = 0;
54static const int MUTATION_SUCCESS = 1;
55
56static const int MAX_OPEN_DB_RETRY = 10;
57
58static const uint32_t DEFAULT_META_LEN = 16;
59
60class NoLookupCallback : public Callback<CacheLookup> {
61public:
62    NoLookupCallback() {}
63    ~NoLookupCallback() {}
64    void callback(CacheLookup&) {}
65};
66
67extern "C" {
68    static int recordDbDumpC(Db *db, DocInfo *docinfo, void *ctx)
69    {
70        return CouchKVStore::recordDbDump(db, docinfo, ctx);
71    }
72}
73
74extern "C" {
75    static int getMultiCbC(Db *db, DocInfo *docinfo, void *ctx)
76    {
77        return CouchKVStore::getMultiCb(db, docinfo, ctx);
78    }
79}
80
81static std::string getStrError(Db *db) {
82    const size_t max_msg_len = 256;
83    char msg[max_msg_len];
84    couchstore_last_os_error(db, msg, max_msg_len);
85    std::string errorStr(msg);
86    return errorStr;
87}
88
89static std::string getSystemStrerror(void) {
90    std::stringstream ss;
91#ifdef WIN32
92    char* win_msg = NULL;
93    DWORD err = GetLastError();
94    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
95                   FORMAT_MESSAGE_FROM_SYSTEM |
96                   FORMAT_MESSAGE_IGNORE_INSERTS,
97                   NULL, err,
98                   MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
99                   (LPTSTR) &win_msg,
100                   0, NULL);
101    ss << "errno = " << err << ": '" << win_msg << "'";
102    LocalFree(win_msg);
103#else
104    ss << "errno = " << errno << ": '" << strerror(errno) << "'";
105#endif
106
107    return ss.str();
108}
109
110static const std::string getJSONObjString(const cJSON *i)
111{
112    if (i == NULL) {
113        return "";
114    }
115    if (i->type != cJSON_String) {
116        abort();
117    }
118    return i->valuestring;
119}
120
121static bool endWithCompact(const std::string &filename)
122{
123    size_t pos = filename.find(".compact");
124    if (pos == std::string::npos || (filename.size() - sizeof(".compact")) != pos) {
125        return false;
126    }
127    return true;
128}
129
130static void discoverDbFiles(const std::string &dir, std::vector<std::string> &v)
131{
132    std::vector<std::string> files = findFilesContaining(dir, ".couch");
133    std::vector<std::string>::iterator ii;
134    for (ii = files.begin(); ii != files.end(); ++ii) {
135        if (!endWithCompact(*ii)) {
136            v.push_back(*ii);
137        }
138    }
139}
140
141static uint64_t computeMaxDeletedSeqNum(DocInfo **docinfos, const int numdocs)
142{
143    uint64_t max = 0;
144    for (int idx = 0; idx < numdocs; idx++) {
145        if (docinfos[idx]->deleted) {
146            // check seq number only from a deleted file
147            uint64_t seqNum = docinfos[idx]->rev_seq;
148            max = std::max(seqNum, max);
149        }
150    }
151    return max;
152}
153
154static int getMutationStatus(couchstore_error_t errCode)
155{
156    switch (errCode) {
157    case COUCHSTORE_SUCCESS:
158        return MUTATION_SUCCESS;
159    case COUCHSTORE_ERROR_NO_HEADER:
160    case COUCHSTORE_ERROR_NO_SUCH_FILE:
161    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
162        // this return causes ep engine to drop the failed flush
163        // of an item since it does not know about the itme any longer
164        return DOC_NOT_FOUND;
165    default:
166        // this return causes ep engine to keep requeuing the failed
167        // flush of an item
168        return MUTATION_FAILED;
169    }
170}
171
172static bool allDigit(std::string &input)
173{
174    size_t numchar = input.length();
175    for(size_t i = 0; i < numchar; ++i) {
176        if (!isdigit(input[i])) {
177            return false;
178        }
179    }
180    return true;
181}
182
183static std::string couchkvstore_strerrno(Db *db, couchstore_error_t err) {
184    return (err == COUCHSTORE_ERROR_OPEN_FILE ||
185            err == COUCHSTORE_ERROR_READ ||
186            err == COUCHSTORE_ERROR_WRITE) ? getStrError(db) : "none";
187}
188
189struct GetMultiCbCtx {
190    GetMultiCbCtx(CouchKVStore &c, uint16_t v, vb_bgfetch_queue_t &f) :
191        cks(c), vbId(v), fetches(f) {}
192
193    CouchKVStore &cks;
194    uint16_t vbId;
195    vb_bgfetch_queue_t &fetches;
196};
197
198struct StatResponseCtx {
199public:
200    StatResponseCtx(std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &sm,
201                    uint16_t vb) : statMap(sm), vbId(vb) {
202        /* EMPTY */
203    }
204
205    std::map<std::pair<uint16_t, uint16_t>, vbucket_state> &statMap;
206    uint16_t vbId;
207};
208
209struct LoadResponseCtx {
210    shared_ptr<Callback<GetValue> > callback;
211    shared_ptr<Callback<CacheLookup> > lookup;
212    uint16_t vbucketId;
213    uint64_t endSeqno;
214    bool keysonly;
215    EPStats *stats;
216};
217
218struct AllKeysCtx {
219    AllKeysCtx(AllKeysCB *callback, uint32_t cnt) :
220        cb(callback), count(cnt) { }
221
222    AllKeysCB *cb;
223    uint32_t count;
224};
225
226CouchRequest::CouchRequest(const Item &it, uint64_t rev,
227                           CouchRequestCallback &cb, bool del) :
228    value(it.getValue()), vbucketId(it.getVBucketId()), fileRevNum(rev),
229    key(it.getKey()), deleteItem(del)
230{
231    uint64_t cas = htonll(it.getCas());
232    uint32_t flags = it.getFlags();
233    uint32_t vlen = it.getNBytes();
234    uint32_t exptime = it.getExptime();
235
236    // Datatype used to determine whether document requires compression or not
237    uint8_t datatype;
238
239    // Save time of deletion in expiry time field of deleted item's metadata.
240    if (del) {
241        exptime = ep_real_time();
242    }
243    exptime = htonl(exptime);
244
245    dbDoc.id.buf = const_cast<char *>(key.c_str());
246    dbDoc.id.size = it.getNKey();
247    if (vlen) {
248        dbDoc.data.buf = const_cast<char *>(value->getData());
249        dbDoc.data.size = vlen;
250        datatype = it.getDataType();
251    } else {
252        dbDoc.data.buf = NULL;
253        dbDoc.data.size = 0;
254        datatype = 0x00;
255    }
256
257    memset(meta, 0, sizeof(meta));
258    memcpy(meta, &cas, 8);
259    memcpy(meta + 8, &exptime, 4);
260    memcpy(meta + 12, &flags, 4);
261    *(meta + DEFAULT_META_LEN) = FLEX_META_CODE;
262    memcpy(meta + DEFAULT_META_LEN + FLEX_DATA_OFFSET, it.getExtMeta(),
263           it.getExtMetaLen());
264
265    dbDocInfo.db_seq = it.getBySeqno();
266    dbDocInfo.rev_meta.buf = reinterpret_cast<char *>(meta);
267    dbDocInfo.rev_meta.size = COUCHSTORE_METADATA_SIZE;
268    dbDocInfo.rev_seq = it.getRevSeqno();
269    dbDocInfo.size = dbDoc.data.size;
270    if (del) {
271        dbDocInfo.deleted =  1;
272        callback.delCb = cb.delCb;
273    } else {
274        dbDocInfo.deleted = 0;
275        callback.setCb = cb.setCb;
276    }
277    dbDocInfo.id = dbDoc.id;
278    dbDocInfo.content_meta = (datatype == PROTOCOL_BINARY_DATATYPE_JSON) ?
279                                    COUCH_DOC_IS_JSON : COUCH_DOC_NON_JSON_MODE;
280
281    //Compress only those documents that aren't already compressed.
282    if (dbDoc.data.size > 0 && !deleteItem) {
283        if (datatype == PROTOCOL_BINARY_RAW_BYTES ||
284                datatype == PROTOCOL_BINARY_DATATYPE_JSON) {
285            dbDocInfo.content_meta |= COUCH_DOC_IS_COMPRESSED;
286        }
287    }
288    start = gethrtime();
289}
290
291CouchKVStore::CouchKVStore(EPStats &stats, Configuration &config, bool read_only) :
292    KVStore(read_only), epStats(stats), configuration(config),
293    dbname(configuration.getDbname()), couchNotifier(NULL),
294    intransaction(false), dbFileRevMapPopulated(false)
295{
296    open();
297    statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
298
299    // init db file map with default revision number, 1
300    numDbFiles = static_cast<uint16_t>(configuration.getMaxVbuckets());
301    for (uint16_t i = 0; i < numDbFiles; i++) {
302        dbFileRevMap.push_back(1);
303    }
304}
305
306CouchKVStore::CouchKVStore(const CouchKVStore &copyFrom) :
307    KVStore(copyFrom), epStats(copyFrom.epStats),
308    configuration(copyFrom.configuration),
309    dbname(copyFrom.dbname),
310    couchNotifier(NULL), dbFileRevMap(copyFrom.dbFileRevMap),
311    numDbFiles(copyFrom.numDbFiles),
312    intransaction(false),
313    dbFileRevMapPopulated(copyFrom.dbFileRevMapPopulated)
314{
315    open();
316    statCollectingFileOps = getCouchstoreStatsOps(&st.fsStats);
317}
318
319void CouchKVStore::reset(uint16_t shardId)
320{
321    cb_assert(!isReadOnly());
322    // TODO CouchKVStore::flush() when couchstore api ready
323
324    if (shardId == 0) {
325        //Notify when PRIMARY_SHARD
326        RememberingCallback<bool> cb;
327        couchNotifier->flush(cb);
328        cb.waitForValue();
329    }
330
331    vbucket_map_t::iterator itor = cachedVBStates.begin();
332    for (; itor != cachedVBStates.end(); ++itor) {
333        uint16_t vbucket = itor->first;
334        if (vbucket % configuration.getMaxNumShards() != shardId) {
335            continue;
336        }
337        itor->second.checkpointId = 0;
338        itor->second.maxDeletedSeqno = 0;
339        resetVBucket(vbucket, itor->second);
340        updateDbFileMap(vbucket, 1);
341    }
342}
343
344void CouchKVStore::set(const Item &itm, Callback<mutation_result> &cb)
345{
346    cb_assert(!isReadOnly());
347    cb_assert(intransaction);
348    bool deleteItem = false;
349    CouchRequestCallback requestcb;
350    std::string dbFile;
351    uint64_t fileRev = dbFileRevMap[itm.getVBucketId()];
352
353    // each req will be de-allocated after commit
354    requestcb.setCb = &cb;
355    CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, deleteItem);
356    pendingReqsQ.push_back(req);
357}
358
359void CouchKVStore::get(const std::string &key, uint64_t, uint16_t vb,
360                       Callback<GetValue> &cb, bool fetchDelete)
361{
362    Db *db = NULL;
363    std::string dbFile;
364    GetValue rv;
365    uint64_t fileRev = dbFileRevMap[vb];
366
367    couchstore_error_t errCode = openDB(vb, fileRev, &db,
368                                        COUCHSTORE_OPEN_FLAG_RDONLY);
369    if (errCode != COUCHSTORE_SUCCESS) {
370        ++st.numGetFailure;
371        LOG(EXTENSION_LOG_WARNING,
372            "Warning: failed to open database to retrieve data "
373            "from vBucketId = %d, key = %s, file = %s\n",
374            vb, key.c_str(), dbFile.c_str());
375        rv.setStatus(couchErr2EngineErr(errCode));
376        cb.callback(rv);
377        return;
378    }
379
380    getWithHeader(db, key, vb, cb, fetchDelete);
381    closeDatabaseHandle(db);
382}
383
384void CouchKVStore::getWithHeader(void *dbHandle, const std::string &key,
385                                 uint16_t vb, Callback<GetValue> &cb,
386                                 bool fetchDelete) {
387
388    Db *db = (Db *)dbHandle;
389    hrtime_t start = gethrtime();
390    RememberingCallback<GetValue> *rc = dynamic_cast<RememberingCallback<GetValue> *>(&cb);
391    bool getMetaOnly = rc && rc->val.isPartial();
392    DocInfo *docInfo = NULL;
393    sized_buf id;
394    GetValue rv;
395    std::string dbFile;
396
397    id.size = key.size();
398    id.buf = const_cast<char *>(key.c_str());
399    couchstore_error_t errCode = couchstore_docinfo_by_id(db, (uint8_t *)id.buf,
400                                                          id.size, &docInfo);
401    if (errCode != COUCHSTORE_SUCCESS) {
402        if (!getMetaOnly) {
403            // log error only if this is non-xdcr case
404            LOG(EXTENSION_LOG_WARNING,
405                "Warning: failed to retrieve doc info from "
406                "database, name=%s key=%s error=%s [%s]\n",
407                dbFile.c_str(), id.buf, couchstore_strerror(errCode),
408                couchkvstore_strerrno(db, errCode).c_str());
409        }
410    } else {
411        cb_assert(docInfo);
412        errCode = fetchDoc(db, docInfo, rv, vb, getMetaOnly, fetchDelete);
413        if (errCode != COUCHSTORE_SUCCESS) {
414            LOG(EXTENSION_LOG_WARNING,
415                "Warning: failed to retrieve key value from "
416                "database, name=%s key=%s error=%s [%s] "
417                "deleted=%s", dbFile.c_str(), id.buf,
418                couchstore_strerror(errCode),
419                couchkvstore_strerrno(db, errCode).c_str(),
420                docInfo->deleted ? "yes" : "no");
421        }
422
423        // record stats
424        st.readTimeHisto.add((gethrtime() - start) / 1000);
425        if (errCode == COUCHSTORE_SUCCESS) {
426            st.readSizeHisto.add(key.length() + rv.getValue()->getNBytes());
427        }
428    }
429
430    if(errCode != COUCHSTORE_SUCCESS) {
431        ++st.numGetFailure;
432    }
433
434    couchstore_free_docinfo(docInfo);
435    rv.setStatus(couchErr2EngineErr(errCode));
436    cb.callback(rv);
437}
438
439void CouchKVStore::getMulti(uint16_t vb, vb_bgfetch_queue_t &itms)
440{
441    std::string dbFile;
442    int numItems = itms.size();
443    uint64_t fileRev = dbFileRevMap[vb];
444
445    Db *db = NULL;
446    couchstore_error_t errCode = openDB(vb, fileRev, &db,
447                                        COUCHSTORE_OPEN_FLAG_RDONLY);
448    if (errCode != COUCHSTORE_SUCCESS) {
449        LOG(EXTENSION_LOG_WARNING,
450            "Warning: failed to open database for data fetch, "
451            "vBucketId = %d file = %s numDocs = %d\n",
452            vb, dbFile.c_str(), numItems);
453        st.numGetFailure.fetch_add(numItems);
454        vb_bgfetch_queue_t::iterator itr = itms.begin();
455        for (; itr != itms.end(); ++itr) {
456            std::list<VBucketBGFetchItem *> &fetches = (*itr).second;
457            std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
458            for (; fitr != fetches.end(); ++fitr) {
459                (*fitr)->value.setStatus(ENGINE_NOT_MY_VBUCKET);
460            }
461        }
462        return;
463    }
464
465    size_t idx = 0;
466    sized_buf *ids = new sized_buf[itms.size()];
467    vb_bgfetch_queue_t::iterator itr = itms.begin();
468    for (; itr != itms.end(); ++itr) {
469        ids[idx].size = itr->first.size();
470        ids[idx].buf = const_cast<char *>(itr->first.c_str());
471        ++idx;
472    }
473
474    GetMultiCbCtx ctx(*this, vb, itms);
475    errCode = couchstore_docinfos_by_id(db, ids, itms.size(),
476                                        0, getMultiCbC, &ctx);
477    if (errCode != COUCHSTORE_SUCCESS) {
478        st.numGetFailure.fetch_add(numItems);
479        for (itr = itms.begin(); itr != itms.end(); ++itr) {
480            LOG(EXTENSION_LOG_WARNING, "Warning: failed to read database by"
481                " vBucketId = %d key = %s file = %s error = %s [%s]\n",
482                vb, (*itr).first.c_str(),
483                dbFile.c_str(), couchstore_strerror(errCode),
484                couchkvstore_strerrno(db, errCode).c_str());
485            std::list<VBucketBGFetchItem *> &fetches = (*itr).second;
486            std::list<VBucketBGFetchItem *>::iterator fitr = fetches.begin();
487            for (; fitr != fetches.end(); ++fitr) {
488                (*fitr)->value.setStatus(couchErr2EngineErr(errCode));
489            }
490        }
491    }
492    closeDatabaseHandle(db);
493    delete []ids;
494}
495
496void CouchKVStore::del(const Item &itm,
497                       Callback<int> &cb)
498{
499    cb_assert(!isReadOnly());
500    cb_assert(intransaction);
501    uint16_t fileRev = dbFileRevMap[itm.getVBucketId()];
502    CouchRequestCallback requestcb;
503    requestcb.delCb = &cb;
504    CouchRequest *req = new CouchRequest(itm, fileRev, requestcb, true);
505    pendingReqsQ.push_back(req);
506}
507
508bool CouchKVStore::delVBucket(uint16_t vbucket, bool recreate)
509{
510    cb_assert(!isReadOnly());
511    cb_assert(couchNotifier);
512    RememberingCallback<bool> cb;
513
514    couchNotifier->delVBucket(vbucket, cb);
515    cb.waitForValue();
516
517    if (recreate) {
518        vbucket_state vbstate(vbucket_state_dead, 0, 0, 0);
519        vbucket_map_t::iterator it = cachedVBStates.find(vbucket);
520        if (it != cachedVBStates.end()) {
521            vbstate.state = it->second.state;
522        }
523        cachedVBStates[vbucket] = vbstate;
524        resetVBucket(vbucket, vbstate);
525    } else {
526        cachedVBStates.erase(vbucket);
527    }
528    updateDbFileMap(vbucket, 1);
529    return cb.val;
530}
531
532vbucket_map_t CouchKVStore::listPersistedVbuckets()
533{
534    std::vector<std::string> files;
535
536    if (!dbFileRevMapPopulated) {
537        // warmup, first discover db files from local directory
538        discoverDbFiles(dbname, files);
539        populateFileNameMap(files);
540    }
541
542    if (!cachedVBStates.empty()) {
543        cachedVBStates.clear();
544    }
545
546    Db *db = NULL;
547    couchstore_error_t errorCode;
548    for (uint16_t id = 0; id < numDbFiles; id++) {
549        uint64_t rev = dbFileRevMap[id];
550        errorCode = openDB(id, rev, &db, COUCHSTORE_OPEN_FLAG_RDONLY);
551        if (errorCode != COUCHSTORE_SUCCESS) {
552            std::stringstream revnum, vbid;
553            revnum  << rev;
554            vbid << id;
555            std::string fileName =
556                dbname + "/" + vbid.str() + ".couch." + revnum.str();
557            LOG(EXTENSION_LOG_WARNING,
558                "Warning: failed to open database file, name=%s\n",
559                fileName.c_str());
560            remVBucketFromDbFileMap(id);
561        } else {
562            vbucket_state vb_state;
563
564            /* read state of VBucket from db file */
565            readVBState(db, id, vb_state);
566            /* insert populated state to the array to return to the caller */
567            cachedVBStates[id] = vb_state;
568            /* update stat */
569            ++st.numLoadedVb;
570            closeDatabaseHandle(db);
571        }
572        db = NULL;
573        removeCompactFile(dbname, id, rev);
574    }
575
576    return cachedVBStates;
577}
578
579void CouchKVStore::getPersistedStats(std::map<std::string, std::string> &stats)
580{
581    char *buffer = NULL;
582    std::string fname = dbname + "/stats.json";
583    if (access(fname.c_str(), R_OK) == -1) {
584        return ;
585    }
586
587    std::ifstream session_stats;
588    session_stats.exceptions (session_stats.failbit | session_stats.badbit);
589    try {
590        session_stats.open(fname.c_str(), std::ios::binary);
591        session_stats.seekg(0, std::ios::end);
592        int flen = session_stats.tellg();
593        if (flen < 0) {
594            LOG(EXTENSION_LOG_WARNING,
595                "Warning: error in session stats ifstream!!!");
596            session_stats.close();
597            return;
598        }
599        session_stats.seekg(0, std::ios::beg);
600        buffer = new char[flen + 1];
601        session_stats.read(buffer, flen);
602        session_stats.close();
603        buffer[flen] = '\0';
604
605        cJSON *json_obj = cJSON_Parse(buffer);
606        if (!json_obj) {
607            LOG(EXTENSION_LOG_WARNING,
608                "Warning: failed to parse the session stats json doc!!!");
609            delete[] buffer;
610            return;
611        }
612
613        int json_arr_size = cJSON_GetArraySize(json_obj);
614        for (int i = 0; i < json_arr_size; ++i) {
615            cJSON *obj = cJSON_GetArrayItem(json_obj, i);
616            if (obj) {
617                stats[obj->string] = obj->valuestring ? obj->valuestring : "";
618            }
619        }
620        cJSON_Delete(json_obj);
621
622    } catch (const std::ifstream::failure &e) {
623        LOG(EXTENSION_LOG_WARNING,
624            "Warning: failed to load the engine session stats "
625            " due to IO exception \"%s\"", e.what());
626    } catch (...) {
627        LOG(EXTENSION_LOG_WARNING,
628            "Warning: failed to load the engine session stats "
629            " due to IO exception");
630    }
631
632    delete[] buffer;
633}
634
635static std::string getDBFileName(const std::string &dbname,
636                                 uint16_t vbid,
637                                 uint64_t rev)
638{
639    std::stringstream ss;
640    ss << dbname << "/" << vbid << ".couch." << rev;
641    return ss.str();
642}
643
644static int time_purge_hook(Db* d, DocInfo* info, void* ctx_p) {
645    compaction_ctx* ctx = (compaction_ctx*) ctx_p;
646
647    //Compaction finished
648    if (info == NULL) {
649        return couchstore_set_purge_seq(d, ctx->max_purged_seq);
650    }
651
652    if (info->rev_meta.size >= DEFAULT_META_LEN) {
653        uint32_t exptime;
654        memcpy(&exptime, info->rev_meta.buf + 8, 4);
655        exptime = ntohl(exptime);
656        if (info->deleted) {
657            if (!ctx->drop_deletes) { // caller wants to retain deleted items
658                return COUCHSTORE_COMPACT_KEEP_ITEM;
659            }
660            if (exptime < ctx->purge_before_ts &&
661                    (!ctx->purge_before_seq ||
662                     info->db_seq <= ctx->purge_before_seq)) {
663                if (ctx->max_purged_seq < info->db_seq) {
664                    ctx->max_purged_seq = info->db_seq;
665                }
666                return COUCHSTORE_COMPACT_DROP_ITEM;
667            }
668        } else if (exptime && exptime < ctx->curr_time) {
669            std::string keyStr(info->id.buf, info->id.size);
670            expiredItemCtx expItem = { info->rev_seq, keyStr };
671            ctx->expiredItems.push_back(expItem);
672        }
673    }
674
675    return COUCHSTORE_COMPACT_KEEP_ITEM;
676}
677
678bool CouchKVStore::compactVBucket(const uint16_t vbid,
679                                  compaction_ctx *hook_ctx,
680                                  Callback<compaction_ctx> &cb,
681                                  Callback<kvstats_ctx> &kvcb) {
682    couchstore_compact_hook       hook = time_purge_hook;
683    const couch_file_ops     *def_iops = couchstore_get_default_file_ops();
684    Db                      *compactdb = NULL;
685    Db                       *targetDb = NULL;
686    uint64_t                   fileRev = dbFileRevMap[vbid];
687    uint64_t                   new_rev = fileRev + 1;
688    couchstore_error_t         errCode = COUCHSTORE_SUCCESS;
689    hrtime_t                     start = gethrtime();
690    uint64_t              newHeaderPos = 0;
691    std::string                 dbfile;
692    std::string           compact_file;
693    std::string               new_file;
694    kvstats_ctx                  kvctx;
695    DbInfo                        info;
696
697    // Open the source VBucket database file ...
698    errCode = openDB(vbid, fileRev, &compactdb,
699                     (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
700    if (errCode != COUCHSTORE_SUCCESS) {
701        LOG(EXTENSION_LOG_WARNING,
702                "Warning: failed to open database, vbucketId = %d "
703                "fileRev = %llu", vbid, fileRev);
704        return notifyCompaction(vbid, new_rev, VB_COMPACT_OPENDB_ERROR, 0);
705    }
706
707    // Build the temporary vbucket.compact file name
708    dbfile       = getDBFileName(dbname, vbid, fileRev);
709    compact_file = dbfile + ".compact";
710
711    // Perform COMPACTION of vbucket.couch.rev into vbucket.couch.rev.compact
712    errCode = couchstore_compact_db_ex(compactdb, compact_file.c_str(), 0,
713                                       hook, hook_ctx, def_iops);
714    if (errCode != COUCHSTORE_SUCCESS) {
715        LOG(EXTENSION_LOG_WARNING,
716            "Warning: failed to compact database with name=%s "
717            "error=%s errno=%s",
718            dbfile.c_str(),
719            couchstore_strerror(errCode),
720            couchkvstore_strerrno(compactdb, errCode).c_str());
721        closeDatabaseHandle(compactdb);
722
723        return notifyCompaction(vbid, new_rev, VB_COMPACT_OPENDB_ERROR, 0);
724    }
725
726    // Close the source Database File once compaction is done
727    closeDatabaseHandle(compactdb);
728
729    // Rename the .compact file to one with the next revision number
730    new_file = getDBFileName(dbname, vbid, new_rev);
731    if (rename(compact_file.c_str(), new_file.c_str()) != 0) {
732        LOG(EXTENSION_LOG_WARNING,
733            "Warning: failed to rename '%s' to '%s': %s",
734            compact_file.c_str(), new_file.c_str(),
735            getSystemStrerror().c_str());
736
737        removeCompactFile(compact_file);
738        return notifyCompaction(vbid, new_rev, VB_COMPACT_RENAME_ERROR, 0);
739    }
740
741    // Open the newly compacted VBucket database file ...
742    errCode = openDB(vbid, new_rev, &targetDb,
743                     (uint64_t)COUCHSTORE_OPEN_FLAG_RDONLY, NULL);
744    if (errCode != COUCHSTORE_SUCCESS) {
745        LOG(EXTENSION_LOG_WARNING,
746                "Warning: failed to open compacted database file %s "
747                "fileRev = %llu", new_file.c_str(), new_rev);
748        if (remove(new_file.c_str()) != 0) {
749            LOG(EXTENSION_LOG_WARNING, NULL,
750                "Warning: Failed to remove '%s': %s",
751                new_file.c_str(), getSystemStrerror().c_str());
752        }
753        return notifyCompaction(vbid, new_rev, VB_COMPACT_OPENDB_ERROR, 0);
754    }
755
756    // Update the global VBucket file map so all operations use the new file
757    updateDbFileMap(vbid, new_rev);
758
759    LOG(EXTENSION_LOG_INFO,
760            "INFO: created new couch db file, name=%s rev=%llu",
761            new_file.c_str(), new_rev);
762
763    // Update stats to caller
764    kvctx.vbucket = vbid;
765    couchstore_db_info(targetDb, &info);
766    kvctx.fileSpaceUsed = info.space_used;
767    kvctx.fileSize = info.file_size;
768    kvcb.callback(kvctx);
769
770    // Notify MCCouch that compaction is Done...
771    newHeaderPos = couchstore_get_header_position(targetDb);
772    closeDatabaseHandle(targetDb);
773
774    bool retVal = notifyCompaction(vbid, new_rev, VB_COMPACTION_DONE,
775                                   newHeaderPos);
776
777    if (hook_ctx->expiredItems.size()) {
778        cb.callback(*hook_ctx);
779    }
780
781    st.compactHisto.add((gethrtime() - start) / 1000);
782    return retVal;
783}
784
785bool CouchKVStore::notifyCompaction(const uint16_t vbid, uint64_t new_rev,
786                                    uint32_t result, uint64_t header_pos) {
787    RememberingCallback<uint16_t> lcb;
788
789    VBStateNotification vbs(0, 0, result, vbid);
790
791    couchNotifier->notify_update(vbs, new_rev, header_pos, lcb);
792    if (lcb.val != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
793        LOG(EXTENSION_LOG_WARNING,
794                "Warning: compactor failed to notify mccouch on vbucket "
795                "%d. err %d", vbid, lcb.val);
796        return false;
797    }
798    return true;
799}
800
801bool CouchKVStore::snapshotVBuckets(const vbucket_map_t &vbstates,
802                                    Callback<kvstats_ctx> *cb)
803{
804    cb_assert(!isReadOnly());
805    bool success = true;
806
807    vbucket_map_t::const_reverse_iterator iter = vbstates.rbegin();
808    for (; iter != vbstates.rend(); ++iter) {
809        bool notify = false;
810        uint16_t vbucketId = iter->first;
811        vbucket_state vbstate = iter->second;
812        vbucket_map_t::iterator it = cachedVBStates.find(vbucketId);
813        uint32_t vb_change_type = VB_NO_CHANGE;
814        if (it != cachedVBStates.end()) {
815            if (it->second.state != vbstate.state) {
816                vb_change_type |= VB_STATE_CHANGED;
817                notify = true;
818            }
819            if (it->second.checkpointId != vbstate.checkpointId) {
820                vb_change_type |= VB_CHECKPOINT_CHANGED;
821                notify = true;
822            }
823
824            if (it->second.failovers.compare(vbstate.failovers) == 0 &&
825                vb_change_type == VB_NO_CHANGE) {
826                continue; // no changes
827            }
828            it->second.state = vbstate.state;
829            it->second.checkpointId = vbstate.checkpointId;
830            it->second.failovers = vbstate.failovers;
831            // Note that the max deleted seq number is maintained within CouchKVStore
832            vbstate.maxDeletedSeqno = it->second.maxDeletedSeqno;
833        } else {
834            vb_change_type = VB_STATE_CHANGED;
835            cachedVBStates[vbucketId] = vbstate;
836            notify = true;
837        }
838
839        success = setVBucketState(vbucketId, vbstate, vb_change_type, cb,
840                                  notify);
841        if (!success) {
842            LOG(EXTENSION_LOG_WARNING,
843                "Warning: failed to set new state, %s, for vbucket %d\n",
844                VBucket::toString(vbstate.state), vbucketId);
845            break;
846        }
847    }
848    return success;
849}
850
851bool CouchKVStore::snapshotStats(const std::map<std::string, std::string> &stats)
852{
853    cb_assert(!isReadOnly());
854    size_t count = 0;
855    size_t size = stats.size();
856    std::stringstream stats_buf;
857    stats_buf << "{";
858    std::map<std::string, std::string>::const_iterator it = stats.begin();
859    for (; it != stats.end(); ++it) {
860        stats_buf << "\"" << it->first << "\": \"" << it->second << "\"";
861        ++count;
862        if (count < size) {
863            stats_buf << ", ";
864        }
865    }
866    stats_buf << "}";
867
868    // TODO: This stats json should be written into the master database. However,
869    // we don't support the write synchronization between CouchKVStore in C++ and
870    // compaction manager in the erlang side for the master database yet. At this time,
871    // we simply log the engine stats into a separate json file. As part of futhre work,
872    // we need to get rid of the tight coupling between those two components.
873    bool rv = true;
874    std::string next_fname = dbname + "/stats.json.new";
875    std::ofstream new_stats;
876    new_stats.exceptions (new_stats.failbit | new_stats.badbit);
877    try {
878        new_stats.open(next_fname.c_str());
879        new_stats << stats_buf.str().c_str() << std::endl;
880        new_stats.flush();
881        new_stats.close();
882    } catch (const std::ofstream::failure& e) {
883        LOG(EXTENSION_LOG_WARNING, "Warning: failed to log the engine stats to "
884            "file \"%s\" due to IO exception \"%s\"; Not critical because new "
885            "stats will be dumped later, please ignore.",
886            next_fname.c_str(), e.what());
887        rv = false;
888    }
889
890    if (rv) {
891        std::string old_fname = dbname + "/stats.json.old";
892        std::string stats_fname = dbname + "/stats.json";
893        if (access(old_fname.c_str(), F_OK) == 0 && remove(old_fname.c_str()) != 0) {
894            LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
895                old_fname.c_str(), strerror(errno));
896            remove(next_fname.c_str());
897            rv = false;
898        } else if (access(stats_fname.c_str(), F_OK) == 0 &&
899                   rename(stats_fname.c_str(), old_fname.c_str()) != 0) {
900            LOG(EXTENSION_LOG_WARNING,
901                "Warning: failed to rename '%s' to '%s': %s",
902                stats_fname.c_str(), old_fname.c_str(), strerror(errno));
903            remove(next_fname.c_str());
904            rv = false;
905        } else if (rename(next_fname.c_str(), stats_fname.c_str()) != 0) {
906            LOG(EXTENSION_LOG_WARNING,
907                "Warning: failed to rename '%s' to '%s': %s",
908                next_fname.c_str(), stats_fname.c_str(), strerror(errno));
909            remove(next_fname.c_str());
910            rv = false;
911        }
912    }
913
914    return rv;
915}
916
917bool CouchKVStore::setVBucketState(uint16_t vbucketId, vbucket_state &vbstate,
918                                   uint32_t vb_change_type,
919                                   Callback<kvstats_ctx> *kvcb,
920                                   bool notify)
921{
922    Db *db = NULL;
923    uint64_t fileRev, newFileRev;
924    std::stringstream id;
925    std::string dbFileName;
926    std::map<uint16_t, uint64_t>::iterator mapItr;
927    kvstats_ctx kvctx;
928    kvctx.vbucket = vbucketId;
929
930    id << vbucketId;
931    dbFileName = dbname + "/" + id.str() + ".couch." + id.str();
932    fileRev = dbFileRevMap[vbucketId];
933
934    couchstore_error_t errorCode;
935    errorCode = openDB(vbucketId, fileRev, &db,
936            (uint64_t)COUCHSTORE_OPEN_FLAG_CREATE, &newFileRev);
937    if (errorCode != COUCHSTORE_SUCCESS) {
938        ++st.numVbSetFailure;
939        LOG(EXTENSION_LOG_WARNING,
940                "Warning: failed to open database, name=%s",
941                dbFileName.c_str());
942        return false;
943    }
944
945    fileRev = newFileRev;
946    errorCode = saveVBState(db, vbstate);
947    if (errorCode != COUCHSTORE_SUCCESS) {
948        ++st.numVbSetFailure;
949        LOG(EXTENSION_LOG_WARNING,
950                "Warning: failed to save local doc, name=%s",
951                dbFileName.c_str());
952        closeDatabaseHandle(db);
953        return false;
954    }
955
956    errorCode = couchstore_commit(db);
957    if (errorCode != COUCHSTORE_SUCCESS) {
958        ++st.numVbSetFailure;
959        LOG(EXTENSION_LOG_WARNING,
960                "Warning: commit failed, vbid=%u rev=%llu error=%s [%s]",
961                vbucketId, fileRev, couchstore_strerror(errorCode),
962                couchkvstore_strerrno(db, errorCode).c_str());
963        closeDatabaseHandle(db);
964        return false;
965    } else if (notify) {
966        uint64_t newHeaderPos = couchstore_get_header_position(db);
967        RememberingCallback<uint16_t> lcb;
968
969        VBStateNotification vbs(vbstate.checkpointId, vbstate.state,
970                vb_change_type, vbucketId);
971
972        couchNotifier->notify_update(vbs, fileRev, newHeaderPos, lcb);
973        if (lcb.val != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
974            cb_assert(lcb.val != PROTOCOL_BINARY_RESPONSE_ETMPFAIL);
975            LOG(EXTENSION_LOG_WARNING,
976                    "Warning: failed to notify CouchDB of update, "
977                    "vbid=%u rev=%llu error=0x%x\n",
978                    vbucketId, fileRev, lcb.val);
979            if (!epStats.isShutdown) {
980                closeDatabaseHandle(db);
981                return false;
982            }
983        }
984    }
985    if (kvcb) {
986        DbInfo info;
987        couchstore_db_info(db, &info);
988        kvctx.fileSpaceUsed = info.space_used;
989        kvctx.fileSize = info.file_size;
990        kvcb->callback(kvctx);
991    }
992    closeDatabaseHandle(db);
993
994    return true;
995}
996
997void CouchKVStore::dump(std::vector<uint16_t> &vbids,
998                        shared_ptr<Callback<GetValue> > cb,
999                        shared_ptr<Callback<CacheLookup> > cl)
1000{
1001    std::vector<uint16_t>::iterator itr = vbids.begin();
1002    for (; itr != vbids.end(); ++itr) {
1003        loadDB(cb, cl, false, *itr, 0, std::numeric_limits<uint64_t>::max(),
1004               COUCHSTORE_NO_DELETES);
1005    }
1006}
1007
1008void CouchKVStore::dump(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno,
1009                        shared_ptr<Callback<GetValue> > cb,
1010                        shared_ptr<Callback<CacheLookup> > cl)
1011{
1012    std::vector<uint16_t> vbids;
1013    vbids.push_back(vb);
1014    loadDB(cb, cl, false, vb, stSeqno, enSeqno);
1015}
1016
1017void CouchKVStore::dumpKeys(std::vector<uint16_t> &vbids,  shared_ptr<Callback<GetValue> > cb)
1018{
1019    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
1020    std::vector<uint16_t>::iterator itr = vbids.begin();
1021    for (; itr != vbids.end(); ++itr) {
1022        loadDB(cb, cl, true, *itr, 0, std::numeric_limits<uint64_t>::max(),
1023               COUCHSTORE_NO_DELETES);
1024    }
1025}
1026
1027void CouchKVStore::dumpDeleted(uint16_t vb, uint64_t stSeqno, uint64_t enSeqno,
1028                               shared_ptr<Callback<GetValue> > cb)
1029{
1030    std::vector<uint16_t> vbids;
1031    vbids.push_back(vb);
1032    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
1033    loadDB(cb, cl, true, vb, stSeqno, enSeqno, COUCHSTORE_DELETES_ONLY);
1034}
1035
1036StorageProperties CouchKVStore::getStorageProperties()
1037{
1038    StorageProperties rv(true, true, true, true);
1039    return rv;
1040}
1041
1042bool CouchKVStore::commit(Callback<kvstats_ctx> *cb)
1043{
1044    cb_assert(!isReadOnly());
1045    if (intransaction) {
1046        intransaction = commit2couchstore(cb) ? false : true;
1047    }
1048    return !intransaction;
1049
1050}
1051
1052void CouchKVStore::addStats(const std::string &prefix,
1053                            ADD_STAT add_stat,
1054                            const void *c)
1055{
1056    const char *prefix_str = prefix.c_str();
1057
1058    /* stats for both read-only and read-write threads */
1059    addStat(prefix_str, "backend_type",   "couchstore",       add_stat, c);
1060    addStat(prefix_str, "open",           st.numOpen,         add_stat, c);
1061    addStat(prefix_str, "close",          st.numClose,        add_stat, c);
1062    addStat(prefix_str, "readTime",       st.readTimeHisto,   add_stat, c);
1063    addStat(prefix_str, "readSize",       st.readSizeHisto,   add_stat, c);
1064    addStat(prefix_str, "numLoadedVb",    st.numLoadedVb,     add_stat, c);
1065
1066    // failure stats
1067    addStat(prefix_str, "failure_open",   st.numOpenFailure, add_stat, c);
1068    addStat(prefix_str, "failure_get",    st.numGetFailure,  add_stat, c);
1069
1070    if (!isReadOnly()) {
1071        addStat(prefix_str, "failure_set",   st.numSetFailure,   add_stat, c);
1072        addStat(prefix_str, "failure_del",   st.numDelFailure,   add_stat, c);
1073        addStat(prefix_str, "failure_vbset", st.numVbSetFailure, add_stat, c);
1074        addStat(prefix_str, "lastCommDocs",  st.docsCommitted,   add_stat, c);
1075
1076        // stats for CouchNotifier
1077        couchNotifier->addStats(prefix, add_stat, c);
1078    }
1079}
1080
1081void CouchKVStore::addTimingStats(const std::string &prefix,
1082                                  ADD_STAT add_stat, const void *c) {
1083    if (isReadOnly()) {
1084        return;
1085    }
1086    const char *prefix_str = prefix.c_str();
1087    addStat(prefix_str, "commit",      st.commitHisto,      add_stat, c);
1088    addStat(prefix_str, "compact",     st.compactHisto,     add_stat, c);
1089    addStat(prefix_str, "delete",      st.delTimeHisto,     add_stat, c);
1090    addStat(prefix_str, "save_documents", st.saveDocsHisto, add_stat, c);
1091    addStat(prefix_str, "writeTime",   st.writeTimeHisto,   add_stat, c);
1092    addStat(prefix_str, "writeSize",   st.writeSizeHisto,   add_stat, c);
1093    addStat(prefix_str, "bulkSize",    st.batchSize,        add_stat, c);
1094
1095    // Couchstore file ops stats
1096    addStat(prefix_str, "fsReadTime",  st.fsStats.readTimeHisto,  add_stat, c);
1097    addStat(prefix_str, "fsWriteTime", st.fsStats.writeTimeHisto, add_stat, c);
1098    addStat(prefix_str, "fsSyncTime",  st.fsStats.syncTimeHisto,  add_stat, c);
1099    addStat(prefix_str, "fsReadSize",  st.fsStats.readSizeHisto,  add_stat, c);
1100    addStat(prefix_str, "fsWriteSize", st.fsStats.writeSizeHisto, add_stat, c);
1101    addStat(prefix_str, "fsReadSeek",  st.fsStats.readSeekHisto,  add_stat, c);
1102}
1103
1104template <typename T>
1105void CouchKVStore::addStat(const std::string &prefix, const char *stat, T &val,
1106                           ADD_STAT add_stat, const void *c)
1107{
1108    std::stringstream fullstat;
1109    fullstat << prefix << ":" << stat;
1110    add_casted_stat(fullstat.str().c_str(), val, add_stat, c);
1111}
1112
1113void CouchKVStore::optimizeWrites(std::vector<queued_item> &items)
1114{
1115    cb_assert(!isReadOnly());
1116    if (items.empty()) {
1117        return;
1118    }
1119    CompareQueuedItemsBySeqnoAndKey cq;
1120    std::sort(items.begin(), items.end(), cq);
1121}
1122
1123void CouchKVStore::loadDB(shared_ptr<Callback<GetValue> > cb,
1124                          shared_ptr<Callback<CacheLookup> > cl,
1125                          bool keysOnly, uint16_t vbid,
1126                          uint64_t startSeqno, uint64_t endSeqno,
1127                          couchstore_docinfos_options options)
1128{
1129    if (!dbFileRevMapPopulated) {
1130        // warmup, first discover db files from local directory
1131        std::vector<std::string> files;
1132        discoverDbFiles(dbname, files);
1133        populateFileNameMap(files);
1134    }
1135
1136    Db *db = NULL;
1137    uint64_t rev = dbFileRevMap[vbid];
1138    couchstore_error_t errorCode = openDB(vbid, rev, &db,
1139                                          COUCHSTORE_OPEN_FLAG_RDONLY);
1140    if (errorCode != COUCHSTORE_SUCCESS) {
1141        LOG(EXTENSION_LOG_WARNING,
1142            "Failed to open database, name=%s/%d.couch.%lu",
1143            dbname.c_str(), vbid, rev);
1144        remVBucketFromDbFileMap(vbid);
1145    } else {
1146        LoadResponseCtx ctx;
1147        ctx.vbucketId = vbid;
1148        ctx.endSeqno = endSeqno;
1149        ctx.keysonly = keysOnly;
1150        ctx.callback = cb;
1151        ctx.lookup = cl;
1152        ctx.stats = &epStats;
1153        errorCode = couchstore_changes_since(db, startSeqno, options,
1154                                             recordDbDumpC,
1155                                             static_cast<void *>(&ctx));
1156        if (errorCode != COUCHSTORE_SUCCESS) {
1157            if (errorCode == COUCHSTORE_ERROR_CANCEL) {
1158                LOG(EXTENSION_LOG_WARNING,
1159                    "Canceling loading database, warmup has completed\n");
1160            } else {
1161                LOG(EXTENSION_LOG_WARNING,
1162                    "couchstore_changes_since failed, error=%s [%s]",
1163                    couchstore_strerror(errorCode),
1164                    couchkvstore_strerrno(db, errorCode).c_str());
1165                remVBucketFromDbFileMap(vbid);
1166            }
1167        }
1168        closeDatabaseHandle(db);
1169    }
1170}
1171
1172void CouchKVStore::open()
1173{
1174    // TODO intransaction, is it needed?
1175    intransaction = false;
1176    if (!isReadOnly()) {
1177        couchNotifier = CouchNotifier::create(epStats, configuration);
1178    }
1179
1180    struct stat dbstat;
1181    bool havedir = false;
1182
1183    if (stat(dbname.c_str(), &dbstat) == 0 && (dbstat.st_mode & S_IFDIR) == S_IFDIR) {
1184        havedir = true;
1185    }
1186
1187    if (!havedir) {
1188        if (mkdir(dbname.c_str(), S_IRWXU) == -1) {
1189            std::stringstream ss;
1190            ss << "Warning: Failed to create data directory ["
1191               << dbname << "]: " << strerror(errno);
1192            throw std::runtime_error(ss.str());
1193        }
1194    }
1195}
1196
1197void CouchKVStore::close()
1198{
1199    intransaction = false;
1200    if (!isReadOnly()) {
1201        CouchNotifier::deleteNotifier();
1202    }
1203    couchNotifier = NULL;
1204}
1205
1206uint64_t CouchKVStore::checkNewRevNum(std::string &dbFileName, bool newFile)
1207{
1208    uint64_t newrev = 0;
1209    std::string nameKey;
1210
1211    if (!newFile) {
1212        // extract out the file revision number first
1213        size_t secondDot = dbFileName.rfind(".");
1214        nameKey = dbFileName.substr(0, secondDot);
1215    } else {
1216        nameKey = dbFileName;
1217    }
1218    nameKey.append(".");
1219    const std::vector<std::string> files = findFilesWithPrefix(nameKey);
1220    std::vector<std::string>::const_iterator itor;
1221    // found file(s) whoes name has the same key name pair with different
1222    // revision number
1223    for (itor = files.begin(); itor != files.end(); ++itor) {
1224        const std::string &filename = *itor;
1225        if (endWithCompact(filename)) {
1226            continue;
1227        }
1228
1229        size_t secondDot = filename.rfind(".");
1230        char *ptr = NULL;
1231        uint64_t revnum = strtoull(filename.substr(secondDot + 1).c_str(), &ptr, 10);
1232        if (newrev < revnum) {
1233            newrev = revnum;
1234            dbFileName = filename;
1235        }
1236    }
1237    return newrev;
1238}
1239
1240void CouchKVStore::updateDbFileMap(uint16_t vbucketId, uint64_t newFileRev)
1241{
1242    if (vbucketId >= numDbFiles) {
1243        LOG(EXTENSION_LOG_WARNING, NULL,
1244            "Warning: cannot update db file map for an invalid vbucket, "
1245            "vbucket id = %d, rev = %lld\n", vbucketId, newFileRev);
1246        return;
1247    }
1248
1249    dbFileRevMap[vbucketId] = newFileRev;
1250}
1251
1252couchstore_error_t CouchKVStore::openDB(uint16_t vbucketId,
1253                                        uint64_t fileRev,
1254                                        Db **db,
1255                                        uint64_t options,
1256                                        uint64_t *newFileRev)
1257{
1258    std::string dbFileName = getDBFileName(dbname, vbucketId, fileRev);
1259    couch_file_ops* ops = &statCollectingFileOps;
1260
1261    uint64_t newRevNum = fileRev;
1262    couchstore_error_t errorCode = COUCHSTORE_SUCCESS;
1263
1264    if (options == COUCHSTORE_OPEN_FLAG_CREATE) {
1265        // first try to open the requested file without the create option
1266        // in case it does already exist
1267        errorCode = couchstore_open_db_ex(dbFileName.c_str(), 0, ops, db);
1268        if (errorCode != COUCHSTORE_SUCCESS) {
1269            // open_db failed but still check if the file exists
1270            newRevNum = checkNewRevNum(dbFileName);
1271            bool fileExists = (newRevNum) ? true : false;
1272            if (fileExists) {
1273                errorCode = openDB_retry(dbFileName, 0, ops, db, &newRevNum);
1274            } else {
1275                // requested file doesn't seem to exist, just create one
1276                errorCode = couchstore_open_db_ex(dbFileName.c_str(), options,
1277                                                  ops, db);
1278                if (errorCode == COUCHSTORE_SUCCESS) {
1279                    newRevNum = 1;
1280                    updateDbFileMap(vbucketId, fileRev);
1281                    LOG(EXTENSION_LOG_INFO,
1282                        "INFO: created new couch db file, name=%s rev=%llu",
1283                        dbFileName.c_str(), fileRev);
1284                }
1285            }
1286        }
1287    } else {
1288        errorCode = openDB_retry(dbFileName, options, ops, db, &newRevNum);
1289    }
1290
1291    /* update command statistics */
1292    st.numOpen++;
1293    if (errorCode) {
1294        st.numOpenFailure++;
1295        LOG(EXTENSION_LOG_WARNING, "Warning: couchstore_open_db failed, name=%s"
1296            " option=%X rev=%llu error=%s [%s]\n", dbFileName.c_str(), options,
1297            ((newRevNum > fileRev) ? newRevNum : fileRev),
1298            couchstore_strerror(errorCode),
1299            getSystemStrerror().c_str());
1300    } else {
1301        if (newRevNum > fileRev) {
1302            // new revision number found, update it
1303            updateDbFileMap(vbucketId, newRevNum);
1304        }
1305    }
1306
1307    if (newFileRev != NULL) {
1308        *newFileRev = (newRevNum > fileRev) ? newRevNum : fileRev;
1309    }
1310    return errorCode;
1311}
1312
1313couchstore_error_t CouchKVStore::openDB_retry(std::string &dbfile,
1314                                              uint64_t options,
1315                                              const couch_file_ops *ops,
1316                                              Db** db, uint64_t *newFileRev)
1317{
1318    int retry = 0;
1319    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1320
1321    while (retry < MAX_OPEN_DB_RETRY) {
1322        errCode = couchstore_open_db_ex(dbfile.c_str(), options, ops, db);
1323        if (errCode == COUCHSTORE_SUCCESS) {
1324            return errCode;
1325        }
1326        LOG(EXTENSION_LOG_INFO, "INFO: couchstore_open_db failed, name=%s "
1327            "options=%X error=%s [%s], try it again!",
1328            dbfile.c_str(), options, couchstore_strerror(errCode),
1329            getSystemStrerror().c_str());
1330        *newFileRev = checkNewRevNum(dbfile);
1331        ++retry;
1332        if (retry == MAX_OPEN_DB_RETRY - 1 && options == 0 &&
1333            errCode == COUCHSTORE_ERROR_NO_SUCH_FILE) {
1334            options = COUCHSTORE_OPEN_FLAG_CREATE;
1335        }
1336    }
1337    return errCode;
1338}
1339
1340void CouchKVStore::populateFileNameMap(std::vector<std::string> &filenames)
1341{
1342    std::vector<std::string>::iterator fileItr;
1343
1344    for (fileItr = filenames.begin(); fileItr != filenames.end(); ++fileItr) {
1345        const std::string &filename = *fileItr;
1346        size_t secondDot = filename.rfind(".");
1347        std::string nameKey = filename.substr(0, secondDot);
1348        size_t firstDot = nameKey.rfind(".");
1349        size_t firstSlash = nameKey.rfind("/");
1350
1351        std::string revNumStr = filename.substr(secondDot + 1);
1352        char *ptr = NULL;
1353        uint64_t revNum = strtoull(revNumStr.c_str(), &ptr, 10);
1354
1355        std::string vbIdStr = nameKey.substr(firstSlash + 1, (firstDot - firstSlash) - 1);
1356        if (allDigit(vbIdStr)) {
1357            uint64_t old_rev_num = 0;
1358            int vbId = atoi(vbIdStr.c_str());
1359            if (dbFileRevMap[vbId] == revNum) {
1360                continue;
1361            } else if (dbFileRevMap[vbId] < revNum) {
1362                old_rev_num = dbFileRevMap[vbId];
1363                dbFileRevMap[vbId] = revNum;
1364            } else {
1365                old_rev_num = revNum;
1366            }
1367            std::stringstream old_file;
1368            old_file << dbname << "/" << vbId << ".couch." << old_rev_num;
1369            if (access(old_file.str().c_str(), F_OK) == 0) {
1370                if (remove(old_file.str().c_str()) != 0) {
1371                    LOG(EXTENSION_LOG_WARNING,
1372                        "Warning: Failed to remove the stale file '%s': %s",
1373                        old_file.str().c_str(), getSystemStrerror().c_str());
1374                }
1375            }
1376        } else {
1377            // skip non-vbucket database file, master.couch etc
1378            LOG(EXTENSION_LOG_DEBUG,
1379                "Non-vbucket database file, %s, skip adding "
1380                "to CouchKVStore dbFileMap\n", filename.c_str());
1381        }
1382    }
1383    dbFileRevMapPopulated = true;
1384}
1385
1386couchstore_error_t CouchKVStore::fetchDoc(Db *db, DocInfo *docinfo,
1387                                          GetValue &docValue, uint16_t vbId,
1388                                          bool metaOnly, bool fetchDelete)
1389{
1390    couchstore_error_t errCode = COUCHSTORE_SUCCESS;
1391    sized_buf metadata = docinfo->rev_meta;
1392    uint32_t itemFlags;
1393    uint64_t cas;
1394    time_t exptime;
1395    uint8_t *ext_meta = NULL;
1396    uint8_t ext_len;
1397
1398    cb_assert(metadata.size >= DEFAULT_META_LEN);
1399    if (metadata.size == DEFAULT_META_LEN) {
1400        memcpy(&cas, (metadata.buf), 8);
1401        memcpy(&exptime, (metadata.buf) + 8, 4);
1402        memcpy(&itemFlags, (metadata.buf) + 12, 4);
1403        ext_len = 0;
1404    } else {
1405        //metadata.size => 18, FLEX_META_CODE at offset 16
1406        memcpy(&cas, (metadata.buf), 8);
1407        memcpy(&exptime, (metadata.buf) + 8, 4);
1408        memcpy(&itemFlags, (metadata.buf) + 12, 4);
1409        ext_len = metadata.size - DEFAULT_META_LEN - FLEX_DATA_OFFSET;
1410        ext_meta = (uint8_t *)malloc(sizeof(uint8_t) * ext_len);
1411        memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1412               ext_len);
1413    }
1414    cas = ntohll(cas);
1415    exptime = ntohl(exptime);
1416
1417    if (metaOnly || (fetchDelete && docinfo->deleted)) {
1418        Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1419                            docinfo->size, itemFlags, (time_t)exptime,
1420                            ext_meta, ext_len, cas, docinfo->db_seq, vbId);
1421        if (docinfo->deleted) {
1422            it->setDeleted();
1423        }
1424        it->setRevSeqno(docinfo->rev_seq);
1425        docValue = GetValue(it);
1426        // update ep-engine IO stats
1427        ++epStats.io_num_read;
1428        epStats.io_read_bytes.fetch_add(docinfo->id.size);
1429    } else {
1430        Doc *doc = NULL;
1431        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1432                                                   DECOMPRESS_DOC_BODIES);
1433        if (errCode == COUCHSTORE_SUCCESS) {
1434            if (docinfo->deleted) {
1435                // do not read a doc that is marked deleted, just return the
1436                // error code as not found but still release the document body.
1437                errCode = COUCHSTORE_ERROR_DOC_NOT_FOUND;
1438            } else {
1439                cb_assert(doc && (doc->id.size <= UINT16_MAX));
1440                size_t valuelen = doc->data.size;
1441                void *valuePtr = doc->data.buf;
1442                Item *it = new Item(docinfo->id.buf, (size_t)docinfo->id.size,
1443                                    itemFlags, (time_t)exptime, valuePtr, valuelen,
1444                                    ext_meta, ext_len, cas, docinfo->db_seq, vbId,
1445                                    docinfo->rev_seq);
1446                docValue = GetValue(it);
1447
1448                // update ep-engine IO stats
1449                ++epStats.io_num_read;
1450                epStats.io_read_bytes.fetch_add(docinfo->id.size + valuelen);
1451            }
1452            couchstore_free_document(doc);
1453        }
1454    }
1455    free(ext_meta);
1456    return errCode;
1457}
1458
1459int CouchKVStore::recordDbDump(Db *db, DocInfo *docinfo, void *ctx)
1460{
1461    LoadResponseCtx *loadCtx = (LoadResponseCtx *)ctx;
1462    shared_ptr<Callback<GetValue> > cb = loadCtx->callback;
1463    shared_ptr<Callback<CacheLookup> > cl = loadCtx->lookup;
1464
1465    Doc *doc = NULL;
1466    void *valuePtr = NULL;
1467    size_t valuelen = 0;
1468    uint64_t byseqno = docinfo->db_seq;
1469    sized_buf  metadata = docinfo->rev_meta;
1470    uint16_t vbucketId = loadCtx->vbucketId;
1471    sized_buf key = docinfo->id;
1472    uint32_t itemflags;
1473    uint64_t cas;
1474    uint32_t exptime;
1475    uint8_t *ext_meta = NULL;
1476    uint8_t ext_len;
1477
1478    cb_assert(key.size <= UINT16_MAX);
1479    cb_assert(metadata.size >= DEFAULT_META_LEN);
1480
1481    if (byseqno > loadCtx->endSeqno) {
1482        return COUCHSTORE_ERROR_CANCEL;
1483    }
1484
1485    std::string docKey(docinfo->id.buf, docinfo->id.size);
1486    CacheLookup lookup(docKey, byseqno, vbucketId);
1487    cl->callback(lookup);
1488    if (cl->getStatus() == ENGINE_KEY_EEXISTS) {
1489        return COUCHSTORE_SUCCESS;
1490    }
1491
1492    if (metadata.size == DEFAULT_META_LEN) {
1493        memcpy(&cas, (metadata.buf), 8);
1494        memcpy(&exptime, (metadata.buf) + 8, 4);
1495        memcpy(&itemflags, (metadata.buf) + 12, 4);
1496        ext_len = 0;
1497    } else {
1498        //metadata.size > 16, FLEX_META_CODE at offset 16
1499        memcpy(&cas, (metadata.buf), 8);
1500        memcpy(&exptime, (metadata.buf) + 8, 4);
1501        memcpy(&itemflags, (metadata.buf) + 12, 4);
1502        ext_len = metadata.size - DEFAULT_META_LEN - FLEX_DATA_OFFSET;
1503        ext_meta = (uint8_t *)malloc(sizeof(uint8_t) * ext_len);
1504        memcpy(ext_meta, (metadata.buf) + DEFAULT_META_LEN + FLEX_DATA_OFFSET,
1505               ext_len);
1506    }
1507    exptime = ntohl(exptime);
1508    cas = ntohll(cas);
1509
1510    if (!loadCtx->keysonly && !docinfo->deleted) {
1511        couchstore_error_t errCode ;
1512        errCode = couchstore_open_doc_with_docinfo(db, docinfo, &doc,
1513                                                   DECOMPRESS_DOC_BODIES);
1514
1515        if (errCode == COUCHSTORE_SUCCESS) {
1516            if (doc->data.size) {
1517                valuelen = doc->data.size;
1518                valuePtr = doc->data.buf;
1519            }
1520        } else {
1521            LOG(EXTENSION_LOG_WARNING,
1522                "Warning: failed to retrieve key value from database "
1523                "database, vBucket=%d key=%s error=%s [%s]\n",
1524                vbucketId, key.buf, couchstore_strerror(errCode),
1525                couchkvstore_strerrno(db, errCode).c_str());
1526            free(ext_meta);
1527            return COUCHSTORE_SUCCESS;
1528        }
1529    }
1530
1531    Item *it = new Item((void *)key.buf,
1532                        key.size,
1533                        itemflags,
1534                        (time_t)exptime,
1535                        valuePtr, valuelen,
1536                        ext_meta, ext_len,
1537                        cas,
1538                        docinfo->db_seq, // return seq number being persisted on disk
1539                        vbucketId,
1540                        docinfo->rev_seq);
1541    if (docinfo->deleted) {
1542        it->setDeleted();
1543    }
1544
1545
1546    GetValue rv(it, ENGINE_SUCCESS, -1, loadCtx->keysonly);
1547    cb->callback(rv);
1548
1549    couchstore_free_document(doc);
1550
1551    free(ext_meta);
1552
1553    if (cb->getStatus() == ENGINE_ENOMEM) {
1554        return COUCHSTORE_ERROR_CANCEL;
1555    }
1556    return COUCHSTORE_SUCCESS;
1557}
1558
1559bool CouchKVStore::commit2couchstore(Callback<kvstats_ctx> *cb)
1560{
1561    bool success = true;
1562
1563    size_t pendingCommitCnt = pendingReqsQ.size();
1564    if (pendingCommitCnt == 0) {
1565        return success;
1566    }
1567
1568    Doc **docs = new Doc *[pendingCommitCnt];
1569    DocInfo **docinfos = new DocInfo *[pendingCommitCnt];
1570
1571    cb_assert(pendingReqsQ[0]);
1572    uint16_t vbucket2flush = pendingReqsQ[0]->getVBucketId();
1573    uint64_t fileRev = pendingReqsQ[0]->getRevNum();
1574    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1575        CouchRequest *req = pendingReqsQ[i];
1576        cb_assert(req);
1577        docs[i] = req->getDbDoc();
1578        docinfos[i] = req->getDbDocInfo();
1579        cb_assert(vbucket2flush == req->getVBucketId());
1580    }
1581
1582    kvstats_ctx kvctx;
1583    kvctx.vbucket = vbucket2flush;
1584    // flush all
1585    couchstore_error_t errCode = saveDocs(vbucket2flush, fileRev, docs,
1586                                          docinfos, pendingCommitCnt, kvctx);
1587    if (errCode) {
1588        LOG(EXTENSION_LOG_WARNING,
1589            "Warning: commit failed, cannot save CouchDB docs "
1590            "for vbucket = %d rev = %llu\n", vbucket2flush, fileRev);
1591        ++epStats.commitFailed;
1592    }
1593    if (cb) {
1594        cb->callback(kvctx);
1595    }
1596    commitCallback(pendingReqsQ, kvctx, errCode);
1597
1598    // clean up
1599    for (size_t i = 0; i < pendingCommitCnt; ++i) {
1600        delete pendingReqsQ[i];
1601    }
1602    pendingReqsQ.clear();
1603    delete [] docs;
1604    delete [] docinfos;
1605    return success;
1606}
1607
1608static int readDocInfos(Db *db, DocInfo *docinfo, void *ctx)
1609{
1610    cb_assert(ctx);
1611    kvstats_ctx *cbCtx = static_cast<kvstats_ctx *>(ctx);
1612    if(docinfo) {
1613        // An item exists in the VB DB file.
1614        if (!docinfo->deleted) {
1615            std::string key(docinfo->id.buf, docinfo->id.size);
1616            unordered_map<std::string, kstat_entry_t>::iterator itr =
1617                cbCtx->keyStats.find(key);
1618            if (itr != cbCtx->keyStats.end()) {
1619                itr->second.first = true;
1620            }
1621        }
1622    }
1623    return 0;
1624}
1625
1626couchstore_error_t CouchKVStore::saveDocs(uint16_t vbid, uint64_t rev, Doc **docs,
1627                                          DocInfo **docinfos, size_t docCount,
1628                                          kvstats_ctx &kvctx)
1629{
1630    couchstore_error_t errCode;
1631    uint64_t fileRev = rev;
1632    DbInfo info;
1633    cb_assert(fileRev);
1634
1635    Db *db = NULL;
1636    uint64_t newFileRev;
1637    errCode = openDB(vbid, fileRev, &db, 0, &newFileRev);
1638    if (errCode != COUCHSTORE_SUCCESS) {
1639        LOG(EXTENSION_LOG_WARNING,
1640                "Warning: failed to open database, vbucketId = %d "
1641                "fileRev = %llu numDocs = %d", vbid, fileRev, docCount);
1642        return errCode;
1643    } else {
1644        uint64_t max = computeMaxDeletedSeqNum(docinfos, docCount);
1645
1646        // update max_deleted_seq in the local doc (vbstate)
1647        // before save docs for the given vBucket
1648        if (max > 0) {
1649            vbucket_map_t::iterator it =
1650                cachedVBStates.find(vbid);
1651            if (it != cachedVBStates.end() && it->second.maxDeletedSeqno < max) {
1652                it->second.maxDeletedSeqno = max;
1653                errCode = saveVBState(db, it->second);
1654                if (errCode != COUCHSTORE_SUCCESS) {
1655                    LOG(EXTENSION_LOG_WARNING,
1656                            "Warning: failed to save local doc for, "
1657                            "vBucket = %d numDocs = %d\n", vbid, docCount);
1658                    closeDatabaseHandle(db);
1659                    return errCode;
1660                }
1661            }
1662        }
1663
1664        sized_buf *ids = new sized_buf[docCount];
1665        for (size_t idx = 0; idx < docCount; idx++) {
1666            ids[idx] = docinfos[idx]->id;
1667            std::string key(ids[idx].buf, ids[idx].size);
1668            kvctx.keyStats[key] = std::make_pair(false,
1669                    !docinfos[idx]->deleted);
1670        }
1671        couchstore_docinfos_by_id(db, ids, (unsigned) docCount, 0,
1672                readDocInfos, &kvctx);
1673        delete[] ids;
1674
1675        hrtime_t cs_begin = gethrtime();
1676        uint64_t flags = COMPRESS_DOC_BODIES | COUCHSTORE_SEQUENCE_AS_IS;
1677        errCode = couchstore_save_documents(db, docs, docinfos,
1678                (unsigned) docCount, flags);
1679        st.saveDocsHisto.add((gethrtime() - cs_begin) / 1000);
1680        if (errCode != COUCHSTORE_SUCCESS) {
1681            LOG(EXTENSION_LOG_WARNING,
1682                    "Warning: failed to save docs to database, numDocs = %d "
1683                    "error=%s [%s]\n", docCount, couchstore_strerror(errCode),
1684                    couchkvstore_strerrno(db, errCode).c_str());
1685            closeDatabaseHandle(db);
1686            return errCode;
1687        }
1688
1689        cs_begin = gethrtime();
1690        errCode = couchstore_commit(db);
1691        st.commitHisto.add((gethrtime() - cs_begin) / 1000);
1692        if (errCode) {
1693            LOG(EXTENSION_LOG_WARNING,
1694                    "Warning: couchstore_commit failed, error=%s [%s]",
1695                    couchstore_strerror(errCode),
1696                    couchkvstore_strerrno(db, errCode).c_str());
1697            closeDatabaseHandle(db);
1698            return errCode;
1699        }
1700
1701        if (epStats.isShutdown) {
1702            // shutdown is in progress, no need to notify mccouch
1703            // the compactor must have already exited!
1704            closeDatabaseHandle(db);
1705            return errCode;
1706        }
1707
1708        RememberingCallback<uint16_t> cb;
1709        uint64_t newHeaderPos = couchstore_get_header_position(db);
1710        couchNotifier->notify_headerpos_update(vbid, newFileRev, newHeaderPos, cb);
1711        if (cb.val != PROTOCOL_BINARY_RESPONSE_SUCCESS) {
1712            cb_assert(cb.val != PROTOCOL_BINARY_RESPONSE_ETMPFAIL);
1713            LOG(EXTENSION_LOG_WARNING, "Warning: failed to notify "
1714                    "CouchDB of update for vbucket=%d, error=0x%x\n",
1715                    vbid, cb.val);
1716        }
1717        st.batchSize.add(docCount);
1718
1719        // retrieve storage system stats for file fragmentation computation
1720        couchstore_db_info(db, &info);
1721        kvctx.fileSpaceUsed = info.space_used;
1722        kvctx.fileSize = info.file_size;
1723        cachedDeleteCount[vbid] = info.deleted_count;
1724        cachedDocCount[vbid] = info.doc_count;
1725        closeDatabaseHandle(db);
1726    }
1727
1728    /* update stat */
1729    if(errCode == COUCHSTORE_SUCCESS) {
1730        st.docsCommitted = docCount;
1731    }
1732
1733    return errCode;
1734}
1735
1736void CouchKVStore::remVBucketFromDbFileMap(uint16_t vbucketId)
1737{
1738    if (vbucketId >= numDbFiles) {
1739        LOG(EXTENSION_LOG_WARNING, NULL,
1740            "Warning: cannot remove db file map entry for an invalid vbucket, "
1741            "vbucket id = %d\n", vbucketId);
1742        return;
1743    }
1744
1745    // just reset revision number of the requested vbucket
1746    dbFileRevMap[vbucketId] = 1;
1747}
1748
1749void CouchKVStore::commitCallback(std::vector<CouchRequest *> &committedReqs,
1750                                  kvstats_ctx &kvctx,
1751                                  couchstore_error_t errCode)
1752{
1753    size_t commitSize = committedReqs.size();
1754
1755    for (size_t index = 0; index < commitSize; index++) {
1756        size_t dataSize = committedReqs[index]->getNBytes();
1757        size_t keySize = committedReqs[index]->getKey().length();
1758        /* update ep stats */
1759        ++epStats.io_num_write;
1760        epStats.io_write_bytes.fetch_add(keySize + dataSize);
1761
1762        if (committedReqs[index]->isDelete()) {
1763            int rv = getMutationStatus(errCode);
1764            if (rv != -1) {
1765                const std::string &key = committedReqs[index]->getKey();
1766                if (kvctx.keyStats[key].first) {
1767                    rv = 1; // Deletion is for an existing item on DB file.
1768                } else {
1769                    rv = 0; // Deletion is for a non-existing item on DB file.
1770                }
1771            }
1772            if (errCode) {
1773                ++st.numDelFailure;
1774            } else {
1775                st.delTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1776            }
1777            committedReqs[index]->getDelCallback()->callback(rv);
1778        } else {
1779            int rv = getMutationStatus(errCode);
1780            const std::string &key = committedReqs[index]->getKey();
1781            bool insertion = !kvctx.keyStats[key].first;
1782            if (errCode) {
1783                ++st.numSetFailure;
1784            } else {
1785                st.writeTimeHisto.add(committedReqs[index]->getDelta() / 1000);
1786                st.writeSizeHisto.add(dataSize + keySize);
1787            }
1788            mutation_result p(rv, insertion);
1789            committedReqs[index]->getSetCallback()->callback(p);
1790        }
1791    }
1792}
1793
1794void CouchKVStore::readVBState(Db *db, uint16_t vbId, vbucket_state &vbState)
1795{
1796    sized_buf id;
1797    LocalDoc *ldoc = NULL;
1798    couchstore_error_t errCode;
1799
1800    vbState.state = vbucket_state_dead;
1801    vbState.checkpointId = 0;
1802    vbState.maxDeletedSeqno = 0;
1803
1804    id.buf = (char *)"_local/vbstate";
1805    id.size = sizeof("_local/vbstate") - 1;
1806    errCode = couchstore_open_local_document(db, (void *)id.buf,
1807                                             id.size, &ldoc);
1808    if (errCode != COUCHSTORE_SUCCESS) {
1809        LOG(EXTENSION_LOG_DEBUG,
1810            "Warning: failed to retrieve stat info for vBucket=%d error=%s [%s]",
1811            vbId, couchstore_strerror(errCode),
1812            couchkvstore_strerrno(db, errCode).c_str());
1813    } else {
1814        const std::string statjson(ldoc->json.buf, ldoc->json.size);
1815        cJSON *jsonObj = cJSON_Parse(statjson.c_str());
1816        if (!jsonObj) {
1817            LOG(EXTENSION_LOG_WARNING,
1818                "Warning: failed to parse the vbstat json doc for vbucket %d: %s",
1819                vbId, statjson.c_str());
1820            couchstore_free_local_document(ldoc);
1821            return;
1822        }
1823
1824        const std::string state = getJSONObjString(
1825                                cJSON_GetObjectItem(jsonObj, "state"));
1826        const std::string checkpoint_id = getJSONObjString(
1827                                cJSON_GetObjectItem(jsonObj,"checkpoint_id"));
1828        const std::string max_deleted_seqno = getJSONObjString(
1829                                cJSON_GetObjectItem(jsonObj, "max_deleted_seqno"));
1830        cJSON *failover_json = cJSON_GetObjectItem(jsonObj, "failover_table");
1831        if (state.compare("") == 0 || checkpoint_id.compare("") == 0
1832                || max_deleted_seqno.compare("") == 0) {
1833            LOG(EXTENSION_LOG_WARNING,
1834                "Warning: state JSON doc for vbucket %d is in the wrong format: %s",
1835                vbId, statjson.c_str());
1836        } else {
1837            vbState.state = VBucket::fromString(state.c_str());
1838            parseUint64(max_deleted_seqno.c_str(), &vbState.maxDeletedSeqno);
1839            parseUint64(checkpoint_id.c_str(), &vbState.checkpointId);
1840
1841            if (failover_json) {
1842                char* json = cJSON_PrintUnformatted(failover_json);
1843                vbState.failovers.assign(json);
1844                free(json);
1845            }
1846        }
1847        cJSON_Delete(jsonObj);
1848        couchstore_free_local_document(ldoc);
1849
1850    }
1851
1852    DbInfo info;
1853    errCode = couchstore_db_info(db, &info);
1854    if (errCode == COUCHSTORE_SUCCESS) {
1855        vbState.highSeqno = info.last_sequence;
1856        vbState.purgeSeqno = info.purge_seq;
1857    } else {
1858        LOG(EXTENSION_LOG_WARNING,
1859            "Warning: failed to read database info for vBucket = %d", id);
1860        abort();
1861    }
1862}
1863
1864couchstore_error_t CouchKVStore::saveVBState(Db *db, vbucket_state &vbState)
1865{
1866    std::stringstream jsonState;
1867
1868    jsonState << "{\"state\": \"" << VBucket::toString(vbState.state) << "\""
1869              << ",\"checkpoint_id\": \"" << vbState.checkpointId << "\""
1870              << ",\"max_deleted_seqno\": \"" << vbState.maxDeletedSeqno << "\""
1871              << ",\"failover_table\": " << vbState.failovers
1872              << "}";
1873
1874    LocalDoc lDoc;
1875    lDoc.id.buf = (char *)"_local/vbstate";
1876    lDoc.id.size = sizeof("_local/vbstate") - 1;
1877    std::string state = jsonState.str();
1878    lDoc.json.buf = (char *)state.c_str();
1879    lDoc.json.size = state.size();
1880    lDoc.deleted = 0;
1881
1882    couchstore_error_t errCode = couchstore_save_local_document(db, &lDoc);
1883    if (errCode != COUCHSTORE_SUCCESS) {
1884        LOG(EXTENSION_LOG_WARNING,
1885            "Warning: couchstore_save_local_document failed "
1886            "error=%s [%s]\n", couchstore_strerror(errCode),
1887            couchkvstore_strerrno(db, errCode).c_str());
1888    }
1889    return errCode;
1890}
1891
1892int CouchKVStore::getMultiCb(Db *db, DocInfo *docinfo, void *ctx)
1893{
1894    cb_assert(docinfo);
1895    std::string keyStr(docinfo->id.buf, docinfo->id.size);
1896    cb_assert(ctx);
1897    GetMultiCbCtx *cbCtx = static_cast<GetMultiCbCtx *>(ctx);
1898    CouchKVStoreStats &st = cbCtx->cks.getCKVStoreStat();
1899
1900
1901    vb_bgfetch_queue_t::iterator qitr = cbCtx->fetches.find(keyStr);
1902    if (qitr == cbCtx->fetches.end()) {
1903        // this could be a serious race condition in couchstore,
1904        // log a warning message and continue
1905        LOG(EXTENSION_LOG_WARNING,
1906            "Warning: couchstore returned invalid docinfo, "
1907            "no pending bgfetch has been issued for key = %s\n",
1908            keyStr.c_str());
1909        return 0;
1910    }
1911
1912    bool meta_only = true;
1913    std::list<VBucketBGFetchItem *> &fetches = (*qitr).second;
1914    std::list<VBucketBGFetchItem *>::iterator itr = fetches.begin();
1915    for (; itr != fetches.end(); ++itr) {
1916        if (!((*itr)->metaDataOnly)) {
1917            meta_only = false;
1918            break;
1919        }
1920    }
1921
1922    GetValue returnVal;
1923    couchstore_error_t errCode = cbCtx->cks.fetchDoc(db, docinfo, returnVal,
1924                                                     cbCtx->vbId, meta_only);
1925    if (errCode != COUCHSTORE_SUCCESS) {
1926        LOG(EXTENSION_LOG_WARNING, "Warning: failed to fetch data from database, "
1927            "vBucket=%d key=%s error=%s [%s]", cbCtx->vbId,
1928            keyStr.c_str(), couchstore_strerror(errCode),
1929            couchkvstore_strerrno(db, errCode).c_str());
1930        st.numGetFailure++;
1931    }
1932
1933    returnVal.setStatus(cbCtx->cks.couchErr2EngineErr(errCode));
1934    for (itr = fetches.begin(); itr != fetches.end(); ++itr) {
1935        // populate return value for remaining fetch items with the
1936        // same seqid
1937        (*itr)->value = returnVal;
1938        st.readTimeHisto.add((gethrtime() - (*itr)->initTime) / 1000);
1939        if (errCode == COUCHSTORE_SUCCESS) {
1940            st.readSizeHisto.add(returnVal.getValue()->getNKey() +
1941                                 returnVal.getValue()->getNBytes());
1942        }
1943    }
1944    return 0;
1945}
1946
1947
1948void CouchKVStore::closeDatabaseHandle(Db *db) {
1949    couchstore_error_t ret = couchstore_close_db(db);
1950    if (ret != COUCHSTORE_SUCCESS) {
1951        LOG(EXTENSION_LOG_WARNING,
1952            "Warning: couchstore_close_db failed, error=%s [%s]",
1953            couchstore_strerror(ret), couchkvstore_strerrno(NULL, ret).c_str());
1954    }
1955    st.numClose++;
1956}
1957
1958ENGINE_ERROR_CODE CouchKVStore::couchErr2EngineErr(couchstore_error_t errCode)
1959{
1960    switch (errCode) {
1961    case COUCHSTORE_SUCCESS:
1962        return ENGINE_SUCCESS;
1963    case COUCHSTORE_ERROR_ALLOC_FAIL:
1964        return ENGINE_ENOMEM;
1965    case COUCHSTORE_ERROR_DOC_NOT_FOUND:
1966        return ENGINE_KEY_ENOENT;
1967    case COUCHSTORE_ERROR_NO_SUCH_FILE:
1968    case COUCHSTORE_ERROR_NO_HEADER:
1969    default:
1970        // same as the general error return code of
1971        // EvetuallyPersistentStore::getInternal
1972        return ENGINE_TMPFAIL;
1973    }
1974}
1975
1976size_t CouchKVStore::getEstimatedItemCount(std::vector<uint16_t> &vbs)
1977{
1978    size_t items = 0;
1979    std::vector<uint16_t>::iterator it;
1980    for (it = vbs.begin(); it != vbs.end(); ++it) {
1981        items += getNumItems(*it);
1982    }
1983    return items;
1984}
1985
1986size_t CouchKVStore::getNumPersistedDeletes(uint16_t vbid) {
1987    std::map<uint16_t, size_t>::iterator itr = cachedDeleteCount.find(vbid);
1988    if (itr != cachedDeleteCount.end()) {
1989        return itr->second;
1990    }
1991
1992    if (!dbFileRevMapPopulated) {
1993        std::vector<std::string> files;
1994        // first scan all db files from data directory
1995        discoverDbFiles(dbname, files);
1996        populateFileNameMap(files);
1997    }
1998
1999    Db *db = NULL;
2000    uint64_t rev = dbFileRevMap[vbid];
2001    couchstore_error_t errCode = openDB(vbid, rev, &db,
2002                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2003    if (errCode == COUCHSTORE_SUCCESS) {
2004        DbInfo info;
2005        errCode = couchstore_db_info(db, &info);
2006        if (errCode == COUCHSTORE_SUCCESS) {
2007            cachedDeleteCount[vbid] = info.deleted_count;
2008            closeDatabaseHandle(db);
2009            return info.deleted_count;
2010        } else {
2011            LOG(EXTENSION_LOG_WARNING,
2012                "Warning: failed to read database info for "
2013                "vBucket = %d rev = %llu\n", vbid, rev);
2014        }
2015        closeDatabaseHandle(db);
2016    } else {
2017        LOG(EXTENSION_LOG_WARNING,
2018            "Warning: failed to open database file for "
2019            "vBucket = %d rev = %llu\n", vbid, rev);
2020    }
2021    return 0;
2022
2023}
2024
2025size_t CouchKVStore::getNumItems(uint16_t vbid) {
2026    unordered_map<uint16_t, size_t>::iterator itr = cachedDocCount.find(vbid);
2027    if (itr != cachedDocCount.end()) {
2028        return itr->second;
2029    }
2030
2031    if (!dbFileRevMapPopulated) {
2032        std::vector<std::string> files;
2033        // first scan all db files from data directory
2034        discoverDbFiles(dbname, files);
2035        populateFileNameMap(files);
2036    }
2037
2038    Db *db = NULL;
2039    uint64_t rev = dbFileRevMap[vbid];
2040    couchstore_error_t errCode = openDB(vbid, rev, &db,
2041                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2042    if (errCode == COUCHSTORE_SUCCESS) {
2043        DbInfo info;
2044        errCode = couchstore_db_info(db, &info);
2045        if (errCode == COUCHSTORE_SUCCESS) {
2046            cachedDocCount[vbid] = info.doc_count;
2047            closeDatabaseHandle(db);
2048            return info.doc_count;
2049        } else {
2050            LOG(EXTENSION_LOG_WARNING,
2051                "Warning: failed to read database info for "
2052                "vBucket = %d rev = %llu\n", vbid, rev);
2053        }
2054        closeDatabaseHandle(db);
2055    } else {
2056        LOG(EXTENSION_LOG_WARNING,
2057            "Warning: failed to open database file for "
2058            "vBucket = %d rev = %llu\n", vbid, rev);
2059    }
2060    return 0;
2061}
2062
2063size_t CouchKVStore::getNumItems(uint16_t vbid, uint64_t min_seq,
2064                                 uint64_t max_seq) {
2065    if (!dbFileRevMapPopulated) {
2066        std::vector<std::string> files;
2067        discoverDbFiles(dbname, files);
2068        populateFileNameMap(files);
2069    }
2070
2071    Db *db = NULL;
2072    uint64_t count = 0;
2073    uint64_t rev = dbFileRevMap[vbid];
2074    couchstore_error_t errCode = openDB(vbid, rev, &db,
2075                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2076    if (errCode == COUCHSTORE_SUCCESS) {
2077        errCode = couchstore_changes_count(db, min_seq, max_seq, &count);
2078        if (errCode != COUCHSTORE_SUCCESS) {
2079            LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2080                "vBucket = %d rev = %llu", vbid, rev);
2081        }
2082        closeDatabaseHandle(db);
2083    } else {
2084        LOG(EXTENSION_LOG_WARNING, "Failed to open database file for vBucket"
2085            " = %d rev = %llu", vbid, rev);
2086    }
2087    return count;
2088}
2089
2090rollback_error_code
2091CouchKVStore::rollback(uint16_t vbid,
2092                       uint64_t rollbackSeqno,
2093                       shared_ptr<RollbackCB> cb) {
2094
2095    Db *db = NULL;
2096    DbInfo info;
2097    uint64_t fileRev = dbFileRevMap[vbid];
2098    std::stringstream dbFileName;
2099    dbFileName << dbname << "/" << vbid << ".couch." << fileRev;
2100    couchstore_error_t errCode;
2101    rollback_error_code err;
2102
2103    errCode = openDB(vbid, fileRev, &db,
2104                     (uint64_t) COUCHSTORE_OPEN_FLAG_RDONLY);
2105
2106    if (errCode == COUCHSTORE_SUCCESS) {
2107        errCode = couchstore_db_info(db, &info);
2108        if (errCode != COUCHSTORE_SUCCESS) {
2109            LOG(EXTENSION_LOG_WARNING,
2110                "Failed to read DB info, name=%s",
2111                dbFileName.str().c_str());
2112            closeDatabaseHandle(db);
2113            err.first = ENGINE_ROLLBACK;
2114            err.second = 0;
2115            return err;
2116        }
2117    } else {
2118        LOG(EXTENSION_LOG_WARNING,
2119                "Failed to open database, name=%s",
2120                dbFileName.str().c_str());
2121        err.first = ENGINE_ROLLBACK;
2122        err.second = 0;
2123        return err;
2124    }
2125
2126    uint64_t latestSeqno = info.last_sequence;
2127
2128    //Count from latest seq no to 0
2129    uint64_t totSeqCount = 0;
2130    errCode = couchstore_changes_count(db, 0, latestSeqno, &totSeqCount);
2131    if (errCode != COUCHSTORE_SUCCESS) {
2132        LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2133            "rollback vBucket = %d, rev = %llu", vbid, fileRev);
2134        closeDatabaseHandle(db);
2135        err.first = ENGINE_ROLLBACK;
2136        err.second = 0;
2137        return err;
2138    }
2139
2140    Db *newdb = NULL;
2141    errCode = openDB(vbid, fileRev, &newdb, 0);
2142    if (errCode != COUCHSTORE_SUCCESS) {
2143        LOG(EXTENSION_LOG_WARNING,
2144                "Failed to open database, name=%s",
2145                dbFileName.str().c_str());
2146        closeDatabaseHandle(db);
2147        err.first = ENGINE_ROLLBACK;
2148        err.second = 0;
2149        return err;
2150    }
2151
2152    while (info.last_sequence > rollbackSeqno) {
2153        errCode = couchstore_rewind_db_header(newdb);
2154        if (errCode != COUCHSTORE_SUCCESS) {
2155            LOG(EXTENSION_LOG_WARNING,
2156                    "Failed to rewind Db pointer "
2157                    "for couch file with vbid: %u, whose "
2158                    "lastSeqno: %llu, while trying to roll back "
2159                    "to seqNo: %llu", vbid, latestSeqno, rollbackSeqno);
2160            //Reset the vbucket and send the entire snapshot,
2161            //as a previous header wasn't found.
2162            closeDatabaseHandle(db);
2163            closeDatabaseHandle(newdb);
2164            err.first = ENGINE_ROLLBACK;
2165            err.second = 0;
2166            return err;
2167        }
2168        errCode = couchstore_db_info(newdb, &info);
2169        if (errCode != COUCHSTORE_SUCCESS) {
2170            LOG(EXTENSION_LOG_WARNING,
2171                "Failed to read DB info, name=%s",
2172                dbFileName.str().c_str());
2173            closeDatabaseHandle(db);
2174            closeDatabaseHandle(newdb);
2175            err.first = ENGINE_ROLLBACK;
2176            err.second = 0;
2177            return err;
2178        }
2179    }
2180
2181    //Count from latest seq no to rollback seq no
2182    uint64_t rollbackSeqCount = 0;
2183    errCode = couchstore_changes_count(db, info.last_sequence, latestSeqno,
2184                                       &rollbackSeqCount);
2185    if (errCode != COUCHSTORE_SUCCESS) {
2186        LOG(EXTENSION_LOG_WARNING, "Failed to get changes count for "
2187            "rollback vBucket = %d, rev = %llu", vbid, fileRev);
2188        closeDatabaseHandle(db);
2189        closeDatabaseHandle(newdb);
2190        err.first = ENGINE_ROLLBACK;
2191        err.second = 0;
2192        return err;
2193    }
2194
2195    if ((totSeqCount / 2) <= rollbackSeqCount) {
2196        //doresetVbucket flag set or rollback is greater than 50%,
2197        //reset the vbucket and send the entire snapshot
2198        closeDatabaseHandle(db);
2199        closeDatabaseHandle(newdb);
2200        err.first = ENGINE_ROLLBACK;
2201        err.second = 0;
2202        return err;
2203    }
2204
2205    cb->setDbHeader(newdb);
2206    shared_ptr<Callback<CacheLookup> > cl(new NoLookupCallback());
2207    LoadResponseCtx ctx;
2208    ctx.vbucketId = vbid;
2209    ctx.endSeqno = latestSeqno;
2210    ctx.keysonly = true;
2211    ctx.lookup = cl;
2212    ctx.callback = cb;
2213    ctx.stats = &epStats;
2214    errCode = couchstore_changes_since(db, info.last_sequence + 1,
2215                                       COUCHSTORE_NO_OPTIONS,
2216                                       recordDbDumpC,
2217                                       static_cast<void *>(&ctx));
2218    if (errCode != COUCHSTORE_SUCCESS) {
2219        if (errCode == COUCHSTORE_ERROR_CANCEL) {
2220            LOG(EXTENSION_LOG_WARNING,
2221                "Canceling loading database\n");
2222        } else {
2223            LOG(EXTENSION_LOG_WARNING,
2224                "Couchstore_changes_since failed, error=%s [%s]",
2225                couchstore_strerror(errCode),
2226                couchkvstore_strerrno(db, errCode).c_str());
2227        }
2228        closeDatabaseHandle(db);
2229        closeDatabaseHandle(newdb);
2230        err.first = ENGINE_ROLLBACK;
2231        err.second = 0;
2232        return err;
2233    }
2234
2235    closeDatabaseHandle(db);
2236    //Append the rewinded header to the database file, before closing handle
2237    errCode = couchstore_commit(newdb);
2238    closeDatabaseHandle(newdb);
2239
2240    if (errCode != COUCHSTORE_SUCCESS) {
2241        err.first = ENGINE_ROLLBACK;
2242        err.second = 0;
2243        return err;
2244    }
2245
2246    err.first = ENGINE_SUCCESS;
2247    err.second = info.last_sequence;
2248    return err;
2249}
2250
2251int populateAllKeys(Db *db, DocInfo *docinfo, void *ctx) {
2252    AllKeysCtx *allKeysCtx = (AllKeysCtx *)ctx;
2253    uint16_t keylen = docinfo->id.size;
2254    char *key = docinfo->id.buf;
2255    (allKeysCtx->cb)->addtoAllKeys(keylen, key);
2256    if (--(allKeysCtx->count) <= 0) {
2257        //Only when count met is less than the actual number of entries
2258        return COUCHSTORE_ERROR_CANCEL;
2259    }
2260    return COUCHSTORE_SUCCESS;
2261}
2262
2263ENGINE_ERROR_CODE CouchKVStore::getAllKeys(uint16_t vbid,
2264                                           std::string &start_key,
2265                                           uint32_t count,
2266                                           AllKeysCB *cb) {
2267    Db *db = NULL;
2268    uint64_t rev = dbFileRevMap[vbid];
2269    couchstore_error_t errCode = openDB(vbid, rev, &db,
2270                                        COUCHSTORE_OPEN_FLAG_RDONLY);
2271    if(errCode == COUCHSTORE_SUCCESS) {
2272        sized_buf ref = {NULL, 0};
2273        ref.buf = (char*) start_key.c_str();
2274        ref.size = start_key.size();
2275        AllKeysCtx ctx(cb, count);
2276        errCode = couchstore_all_docs(db, &ref, COUCHSTORE_NO_OPTIONS,
2277                                      populateAllKeys,
2278                                      static_cast<void *>(&ctx));
2279        closeDatabaseHandle(db);
2280        if (errCode == COUCHSTORE_SUCCESS ||
2281                errCode == COUCHSTORE_ERROR_CANCEL)  {
2282            return ENGINE_SUCCESS;
2283        } else {
2284            LOG(EXTENSION_LOG_WARNING, "couchstore_all_docs failed for "
2285                    "database file of vbucket = %d rev = %llu, errCode = %u\n",
2286                    vbid, rev, errCode);
2287        }
2288    } else {
2289        LOG(EXTENSION_LOG_WARNING, "Failed to open database file for "
2290                "vbucket = %d rev = %llu, errCode = %u\n", vbid, rev, errCode);
2291
2292    }
2293    return ENGINE_FAILED;
2294}
2295
2296void CouchKVStore::removeCompactFile(const std::string &dbname,
2297                                     uint16_t vbid,
2298                                     uint64_t fileRev)
2299{
2300    std::string dbfile = getDBFileName(dbname, vbid, fileRev);
2301    std::string compact_file = dbfile + ".compact";
2302    removeCompactFile(compact_file);
2303}
2304
2305void CouchKVStore::removeCompactFile(const std::string &filename)
2306{
2307    if (access(filename.c_str(), F_OK) == 0) {
2308        if (remove(filename.c_str()) == 0) {
2309            LOG(EXTENSION_LOG_WARNING,
2310                "Warning: Removed compact file '%s'", filename.c_str());
2311        }
2312        else {
2313            LOG(EXTENSION_LOG_WARNING,
2314                "Warning: Failed to remove compact file '%s': %s",
2315                filename.c_str(), getSystemStrerror().c_str());
2316        }
2317    }
2318}
2319
2320
2321/* end of couch-kvstore.cc */
2322