1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <sys/stat.h>
21
22#include <algorithm>
23#include <string>
24#include <utility>
25
26extern "C" {
27#include "crc32.h"
28}
29#include "ep_engine.h"
30#include "mutation_log.h"
31
32const char *mutation_log_type_names[] = {
33    "new", "del", "del_all", "commit1", "commit2", NULL
34};
35
36
37#ifdef WIN32
38ssize_t pread(file_handle_t fd, void *buf, size_t nbyte, uint64_t offset)
39{
40    DWORD bytesread;
41    OVERLAPPED winoffs;
42    memset(&winoffs, 0, sizeof(winoffs));
43    winoffs.Offset = offset & 0xFFFFFFFF;
44    winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
45    if (!ReadFile(fd, buf, nbyte, &bytesread, &winoffs)) {
46        /* luckily we don't check errno so we don't need to care about that */
47        return -1;
48    }
49
50    return bytesread;
51}
52
53ssize_t pwrite(file_handle_t fd, const void *buf, size_t nbyte,
54               uint64_t offset)
55{
56    DWORD byteswritten;
57    OVERLAPPED winoffs;
58    memset(&winoffs, 0, sizeof(winoffs));
59    winoffs.Offset = offset & 0xFFFFFFFF;
60    winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
61    if (!WriteFile(fd, buf, nbyte, &byteswritten, &winoffs)) {
62        /* luckily we don't check errno so we don't need to care about that */
63        return -1;
64    }
65
66    return byteswritten;
67}
68
69static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
70                              size_t nbytes) {
71    DWORD byteswritten;
72    if (!WriteFile(fd, buf, nbytes, &byteswritten, NULL)) {
73        /* luckily we don't check errno so we don't need to care about that */
74        return -1;
75    }
76
77    cb_assert(GetLastError() != ERROR_IO_PENDING);
78    return byteswritten;
79}
80
81static inline int doClose(file_handle_t fd) {
82    if (CloseHandle(fd)) {
83        return 0;
84    } else {
85        return -1;
86    }
87}
88
89static inline int doFsync(file_handle_t fd) {
90    if (FlushFileBuffers(fd)) {
91        return 0;
92    } else {
93        return -1;
94    }
95}
96
97static inline std::string getErrorString(void) {
98    std::string ret;
99    char* win_msg = NULL;
100    DWORD err = GetLastError();
101    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
102        FORMAT_MESSAGE_FROM_SYSTEM |
103        FORMAT_MESSAGE_IGNORE_INSERTS,
104        NULL, err,
105        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
106        (LPTSTR)&win_msg,
107        0, NULL);
108    ret.assign(win_msg);
109    LocalFree(win_msg);
110    return ret;
111}
112
113static int64_t SeekFile(file_handle_t fd, const std::string &fname,
114                        uint64_t offset, bool end)
115{
116    LARGE_INTEGER li;
117    li.QuadPart = offset;
118
119    if (end) {
120        li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_END);
121    } else {
122        li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_BEGIN);
123    }
124
125    if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) {
126        std::stringstream ss;
127        ss << "FATAL: SetFilePointer failed " << fname << ": " <<
128              getErrorString();
129        LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
130        li.QuadPart = -1;
131    }
132
133    return li.QuadPart;
134}
135
136file_handle_t OpenFile(const std::string &fname, std::string &error,
137                       bool rdonly) {
138    file_handle_t fd;
139    if (rdonly) {
140        fd = CreateFile(const_cast<char*>(fname.c_str()),
141            GENERIC_READ,
142            FILE_SHARE_READ | FILE_SHARE_WRITE,
143            NULL,
144            OPEN_EXISTING,
145            FILE_ATTRIBUTE_NORMAL,
146            NULL);
147    } else {
148        fd = CreateFile(const_cast<char*>(fname.c_str()),
149            GENERIC_READ | GENERIC_WRITE,
150            FILE_SHARE_READ | FILE_SHARE_WRITE,
151            NULL,
152            OPEN_ALWAYS,
153            FILE_ATTRIBUTE_NORMAL,
154            NULL);
155    }
156
157    if (fd == INVALID_FILE_VALUE) {
158        error.assign(getErrorString());
159    }
160
161    return fd;
162}
163
164int64_t getFileSize(file_handle_t fd) {
165    LARGE_INTEGER li;
166    if (GetFileSizeEx(fd, &li)) {
167        return li.QuadPart;
168    }
169    abort();
170}
171
172#else
173
174static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
175                              size_t nbytes) {
176    ssize_t ret;
177    while ((ret = write(fd, buf, nbytes)) == -1 && (errno == EINTR)) {
178        /* Retry */
179    }
180    return ret;
181}
182
183static inline int doClose(file_handle_t fd) {
184    int ret;
185    while ((ret = close(fd)) == -1 && (errno == EINTR)) {
186        /* Retry */
187    }
188    return ret;
189}
190
191static inline int doFsync(file_handle_t fd) {
192    int ret;
193    while ((ret = fsync(fd)) == -1 && (errno == EINTR)) {
194        /* Retry */
195    }
196    return ret;
197}
198
199static int64_t SeekFile(file_handle_t fd, const std::string &fname,
200                        uint64_t offset, bool end)
201{
202    int64_t ret;
203    if (end) {
204        ret = lseek(fd, offset, SEEK_END);
205    } else {
206        ret = lseek(fd, offset, SEEK_SET);
207    }
208
209    if (ret < 0) {
210        LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
211            fname.c_str(),
212            strerror(errno));
213    }
214    return ret;
215}
216
217file_handle_t OpenFile(const std::string &fname, std::string &error,
218                       bool rdonly) {
219    file_handle_t fd;
220    if (rdonly) {
221        fd = ::open(const_cast<char*>(fname.c_str()), O_RDONLY);
222    } else {
223        fd = ::open(const_cast<char*>(fname.c_str()), O_RDWR | O_CREAT, 0666);
224    }
225
226    if (fd < 0) {
227        error.assign(strerror(errno));
228    }
229
230    return fd;
231}
232
233int64_t getFileSize(file_handle_t fd) {
234    struct stat st;
235    int stat_result = fstat(fd, &st);
236    cb_assert(stat_result == 0);
237    return st.st_size;
238}
239#endif
240
241
242static void writeFully(file_handle_t fd, const uint8_t *buf, size_t nbytes) {
243    while (nbytes > 0) {
244        ssize_t written = doWrite(fd, buf, nbytes);
245        cb_assert(written >= 0);
246
247        nbytes -= written;
248        buf += written;
249    }
250}
251
252uint64_t MutationLogEntry::rowid() const {
253    return ntohll(_rowid);
254}
255
256MutationLog::MutationLog(const std::string &path,
257                         const size_t bs)
258    : paddingHisto(GrowingWidthGenerator<uint32_t>(0, 8, 1.5), 32),
259    logPath(path),
260    blockSize(bs),
261    blockPos(HEADER_RESERVED),
262    file(INVALID_FILE_VALUE),
263    disabled(false),
264    entries(0),
265    entryBuffer(static_cast<uint8_t*>(calloc(MutationLogEntry::len(256), 1))),
266    blockBuffer(static_cast<uint8_t*>(calloc(bs, 1))),
267    syncConfig(DEFAULT_SYNC_CONF),
268    readOnly(false)
269{
270    for (int ii = 0; ii < MUTATION_LOG_TYPES; ++ii) {
271        itemsLogged[ii].store(0);
272    }
273    logSize.store(0);
274
275    cb_assert(entryBuffer);
276    cb_assert(blockBuffer);
277    if (logPath == "") {
278        disabled = true;
279    }
280}
281
282MutationLog::~MutationLog() {
283    flush();
284    close();
285    free(entryBuffer);
286    free(blockBuffer);
287}
288
289void MutationLog::disable() {
290    if (file >= 0) {
291        close();
292        disabled = true;
293    }
294}
295
296void MutationLog::newItem(uint16_t vbucket, const std::string &key,
297                          uint64_t rowid) {
298    if (isEnabled()) {
299        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
300                                                           rowid, ML_NEW,
301                                                           vbucket, key);
302        writeEntry(mle);
303    }
304}
305
306void MutationLog::delItem(uint16_t vbucket, const std::string &key) {
307    if (isEnabled()) {
308        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
309                                                           0, ML_DEL, vbucket,
310                                                           key);
311        writeEntry(mle);
312    }
313}
314
315void MutationLog::deleteAll(uint16_t vbucket) {
316    if (isEnabled()) {
317        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
318                                                           0, ML_DEL_ALL,
319                                                           vbucket, "");
320        writeEntry(mle);
321    }
322}
323
324void MutationLog::sync() {
325    cb_assert(isOpen());
326    BlockTimer timer(&syncTimeHisto);
327    int fsyncResult = doFsync(file);
328    cb_assert(fsyncResult != -1);
329}
330
331void MutationLog::commit1() {
332    if (isEnabled()) {
333        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
334                                                           0, ML_COMMIT1, 0,
335                                                           "");
336        writeEntry(mle);
337        if ((getSyncConfig() & FLUSH_COMMIT_1) != 0) {
338            flush();
339        }
340        if ((getSyncConfig() & SYNC_COMMIT_1) != 0) {
341            sync();
342        }
343    }
344}
345
346void MutationLog::commit2() {
347    if (isEnabled()) {
348        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
349                                                           0, ML_COMMIT2, 0,
350                                                           "");
351        writeEntry(mle);
352        if ((getSyncConfig() & FLUSH_COMMIT_2) != 0) {
353            flush();
354        }
355        if ((getSyncConfig() & SYNC_COMMIT_2) != 0) {
356            sync();
357        }
358    }
359}
360
361bool MutationLog::writeInitialBlock() {
362    cb_assert(!readOnly);
363    cb_assert(isEnabled());
364    cb_assert(isOpen());
365    headerBlock.set(blockSize);
366
367    writeFully(file, (uint8_t*)&headerBlock, sizeof(headerBlock));
368
369    int64_t seek_result = SeekFile(file, getLogFile(),
370                            std::max(
371                            static_cast<uint32_t>(MIN_LOG_HEADER_SIZE),
372                            headerBlock.blockSize() * headerBlock.blockCount())
373                             - 1, false);
374    if (seek_result < 0) {
375        LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
376            getLogFile().c_str(), strerror(errno));
377        return false;
378    }
379    uint8_t zero(0);
380    writeFully(file, &zero, sizeof(zero));
381    return true;
382}
383
384void MutationLog::readInitialBlock() {
385    cb_assert(isOpen());
386    uint8_t buf[MIN_LOG_HEADER_SIZE];
387    ssize_t bytesread = pread(file, buf, sizeof(buf), 0);
388
389    if (bytesread != sizeof(buf)) {
390        LOG(EXTENSION_LOG_WARNING, "FATAL: initial block read failed"
391                "'%s': %s", getLogFile().c_str(), strerror(errno));
392        throw ShortReadException();
393    }
394
395    headerBlock.set(buf, sizeof(buf));
396
397    // These are reserved for future use.
398    if (headerBlock.version() != LOG_VERSION ||
399            headerBlock.blockCount() != 1) {
400        std::stringstream ss;
401        ss << "HeaderBlock version/blockCount mismatch";
402        throw ReadException(ss.str());
403    }
404
405    blockSize = headerBlock.blockSize();
406}
407
408void MutationLog::updateInitialBlock() {
409    cb_assert(!readOnly);
410    cb_assert(isOpen());
411    needWriteAccess();
412
413    uint8_t buf[MIN_LOG_HEADER_SIZE];
414    memset(buf, 0, sizeof(buf));
415    memcpy(buf, (uint8_t*)&headerBlock, sizeof(headerBlock));
416
417    ssize_t byteswritten = pwrite(file, buf, sizeof(buf), 0);
418
419    // @todo we need a write exception
420    if (byteswritten != sizeof(buf)) {
421        throw WriteException("Failed to update header block");
422    }
423}
424
425bool MutationLog::prepareWrites() {
426    if (isEnabled()) {
427        cb_assert(isOpen());
428        int64_t seek_result = SeekFile(file, getLogFile(), 0, true);
429        if (seek_result < 0) {
430            return false;
431        }
432        int64_t unaligned_bytes = seek_result % blockSize;
433        if (unaligned_bytes != 0) {
434            LOG(EXTENSION_LOG_WARNING,
435                    "WARNING: filesize %d not block aligned", seek_result,
436                    "'%s': %s", getLogFile().c_str(), strerror(errno));
437            if (blockSize < (size_t)seek_result) {
438                if (SeekFile(file, getLogFile(),
439                    seek_result - unaligned_bytes, false) < 0) {
440                    LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
441                            getLogFile().c_str(), strerror(errno));
442                    return false;
443                }
444            } else {
445                throw ShortReadException();
446            }
447        }
448        logSize = static_cast<size_t>(seek_result);
449    }
450    return true;
451}
452
453static uint8_t parseConfigString(const std::string &s) {
454    uint8_t rv(0);
455    if (s == "off") {
456        rv = 0;
457    } else if (s == "commit1") {
458        rv = 1;
459    } else if (s == "commit2") {
460        rv = 2;
461    } else if (s == "full") {
462        rv = 3;
463    } else {
464        rv = 0xff;
465    }
466    return rv;
467}
468
469bool MutationLog::setSyncConfig(const std::string &s) {
470    uint8_t v(parseConfigString(s));
471    if (v != 0xff) {
472        syncConfig = (syncConfig & ~SYNC_FULL) | v;
473    }
474    return v != 0xff;
475}
476
477bool MutationLog::setFlushConfig(const std::string &s) {
478    uint8_t v(parseConfigString(s));
479    if (v != 0xff) {
480        syncConfig = (syncConfig & ~FLUSH_FULL) | (v << 2);
481    }
482    return v != 0xff;
483}
484
485bool MutationLog::exists() const {
486    return access(logPath.c_str(), F_OK) == 0;
487}
488
489void MutationLog::open(bool _readOnly) {
490    if (!isEnabled()) {
491        return;
492    }
493    readOnly = _readOnly;
494    std::string error;
495    if (readOnly) {
496        if (!exists()) {
497            throw FileNotFoundException(logPath);
498        }
499        file = OpenFile(logPath, error, true);
500    } else {
501        file = OpenFile(logPath, error, false);
502    }
503
504    if (file == INVALID_FILE_VALUE) {
505        std::stringstream ss;
506        ss << "Unable to open log file: " << error; // strerror(errno);
507        throw ReadException(ss.str());
508    }
509
510    int64_t size = getFileSize(file);
511    if (size && size < static_cast<int64_t>(MIN_LOG_HEADER_SIZE)) {
512        try {
513            LOG(EXTENSION_LOG_WARNING, "WARNING: Corrupted access log '%s'",
514                    getLogFile().c_str());
515            reset();
516            return;
517        } catch (ShortReadException &) {
518            close();
519            disabled = true;
520            throw ShortReadException();
521        }
522    }
523    if (size == 0) {
524        if (!writeInitialBlock()) {
525            close();
526            disabled = true;
527            return;
528        }
529    } else {
530        try {
531            readInitialBlock();
532        } catch (ShortReadException &) {
533            close();
534            disabled = true;
535            throw ShortReadException();
536        }
537
538        if (!readOnly) {
539            headerBlock.setRdwr(1);
540            updateInitialBlock();
541        }
542    }
543
544    if (!prepareWrites()) {
545        close();
546        disabled = true;
547        return;
548    }
549
550    cb_assert(isOpen());
551}
552
553void MutationLog::close() {
554    if (!isEnabled() || !isOpen()) {
555        return;
556    }
557
558    if (!readOnly) {
559        flush();
560        sync();
561        headerBlock.setRdwr(0);
562        updateInitialBlock();
563    }
564
565    int close_result = doClose(file);
566    cb_assert(close_result != -1);
567    file = INVALID_FILE_VALUE;
568}
569
570bool MutationLog::reset() {
571    if (!isEnabled()) {
572        return false;
573    }
574    close();
575
576    if (remove(getLogFile().c_str()) == -1) {
577        LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
578            getLogFile().c_str(), strerror(errno));
579        return false;
580    }
581
582    open();
583    LOG(EXTENSION_LOG_INFO, "Reset a mutation log '%s' successfully.",
584        getLogFile().c_str());
585    return true;
586}
587
588bool MutationLog::replaceWith(MutationLog &mlog) {
589    cb_assert(mlog.isEnabled());
590    cb_assert(isEnabled());
591
592    mlog.flush();
593    mlog.close();
594    flush();
595    close();
596
597    for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
598        itemsLogged[i].store(mlog.itemsLogged[i]);
599    }
600
601    if (rename(mlog.getLogFile().c_str(), getLogFile().c_str()) != 0) {
602        open();
603        std::stringstream ss;
604        ss <<
605        "Unable to rename a mutation log \"" << mlog.getLogFile() << "\" "
606           << "to \"" << getLogFile() << "\": " << strerror(errno);
607        LOG(EXTENSION_LOG_WARNING, "%s!!! Reopened the old log file",
608            ss.str().c_str());
609        return false;
610    }
611
612    open();
613    LOG(EXTENSION_LOG_INFO,
614        "Renamed a mutation log \"%s\" to \"%s\" and reopened it",
615        mlog.getLogFile().c_str(), getLogFile().c_str());
616    return true;
617}
618
619void MutationLog::flush() {
620    if (isEnabled() && blockPos > HEADER_RESERVED) {
621        cb_assert(isOpen());
622        needWriteAccess();
623        BlockTimer timer(&flushTimeHisto);
624
625        if (blockPos < blockSize) {
626            size_t padding(blockSize - blockPos);
627            memset(blockBuffer + blockPos, 0x00, padding);
628            paddingHisto.add(padding);
629        }
630
631        entries = htons(entries);
632        memcpy(blockBuffer + 2, &entries, sizeof(entries));
633
634        uint32_t crc32(crc32buf(blockBuffer + 2, blockSize - 2));
635        uint16_t crc16(htons(crc32 & 0xffff));
636        memcpy(blockBuffer, &crc16, sizeof(crc16));
637
638        writeFully(file, blockBuffer, blockSize);
639        logSize.fetch_add(blockSize);
640
641        blockPos = HEADER_RESERVED;
642        entries = 0;
643    }
644}
645
646void MutationLog::writeEntry(MutationLogEntry *mle) {
647    cb_assert(isEnabled());
648    cb_assert(isOpen());
649    needWriteAccess();
650
651    size_t len(mle->len());
652    if (blockPos + len > blockSize) {
653        flush();
654    }
655    cb_assert(len < blockSize);
656
657    memcpy(blockBuffer + blockPos, mle, len);
658    blockPos += len;
659    ++entries;
660
661    ++itemsLogged[mle->type()];
662
663    delete mle;
664}
665
666static const char* logType(uint8_t t) {
667    switch(t) {
668    case ML_NEW:
669        return "new";
670        break;
671    case ML_DEL:
672        return "del";
673        break;
674    case ML_DEL_ALL:
675        return "delall";
676        break;
677    case ML_COMMIT1:
678        return "commit1";
679        break;
680    case ML_COMMIT2:
681        return "commit2";
682        break;
683    }
684    return "UNKNOWN";
685}
686
687// ----------------------------------------------------------------------
688// Mutation log iterator
689// ----------------------------------------------------------------------
690
691MutationLog::iterator::iterator(const MutationLog *l, bool e)
692  : log(l),
693    entryBuf(NULL),
694    buf(NULL),
695    p(buf),
696    offset(l->header().blockSize() * l->header().blockCount()),
697    items(0),
698    isEnd(e)
699{
700    cb_assert(log);
701}
702
703MutationLog::iterator::iterator(const MutationLog::iterator& mit)
704  : log(mit.log),
705    entryBuf(NULL),
706    buf(NULL),
707    p(NULL),
708    offset(mit.offset),
709    items(mit.items),
710    isEnd(mit.isEnd)
711{
712    cb_assert(log);
713    if (mit.buf != NULL) {
714        buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize()));
715        cb_assert(buf);
716        memcpy(buf, mit.buf, log->header().blockSize());
717        p = buf + (mit.p - mit.buf);
718    }
719
720    if (mit.entryBuf != NULL) {
721        entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE));
722        cb_assert(entryBuf);
723        memcpy(entryBuf, mit.entryBuf, LOG_ENTRY_BUF_SIZE);
724    }
725}
726
727MutationLog::iterator::~iterator() {
728    free(entryBuf);
729    free(buf);
730}
731
732void MutationLog::iterator::prepItem() {
733    MutationLogEntry *e = MutationLogEntry::newEntry(p,
734                                                     bufferBytesRemaining());
735    if (entryBuf == NULL) {
736        entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE));
737        cb_assert(entryBuf);
738    }
739    memcpy(entryBuf, p, e->len());
740}
741
742MutationLog::iterator& MutationLog::iterator::operator++() {
743    if (--items == 0) {
744        nextBlock();
745    } else {
746        size_t l(operator*()->len());
747        p += l;
748
749        prepItem();
750    }
751    return *this;
752}
753
754MutationLog::iterator& MutationLog::iterator::operator++(int) {
755    abort();
756    return *this;
757}
758
759bool MutationLog::iterator::operator==(const MutationLog::iterator& rhs) {
760    return log->fd() == rhs.log->fd()
761        && (
762            (isEnd == rhs.isEnd)
763            || (offset == rhs.offset
764                && items == rhs.items));
765}
766
767bool MutationLog::iterator::operator!=(const MutationLog::iterator& rhs) {
768    return ! operator==(rhs);
769}
770
771const MutationLogEntry* MutationLog::iterator::operator*() {
772    cb_assert(entryBuf != NULL);
773    return MutationLogEntry::newEntry(entryBuf, LOG_ENTRY_BUF_SIZE);
774}
775
776size_t MutationLog::iterator::bufferBytesRemaining() {
777    return log->header().blockSize() - (p - buf);
778}
779
780void MutationLog::iterator::nextBlock() {
781    cb_assert(!log->isEnabled() || log->isOpen());
782    if (buf == NULL) {
783        buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize()));
784        cb_assert(buf);
785    }
786    p = buf;
787
788    ssize_t bytesread = pread(log->fd(), buf, log->header().blockSize(),
789                              offset);
790    if (bytesread < 1) {
791        isEnd = true;
792        return;
793    }
794    if (bytesread != (ssize_t)(log->header().blockSize())) {
795        LOG(EXTENSION_LOG_WARNING, "FATAL: too few bytes read in access log"
796                "'%s': %s", log->getLogFile().c_str(), strerror(errno));
797        throw ShortReadException();
798    }
799    offset += bytesread;
800
801    uint32_t crc32(crc32buf(buf + 2, log->header().blockSize() - 2));
802    uint16_t computed_crc16(crc32 & 0xffff);
803    uint16_t retrieved_crc16;
804    memcpy(&retrieved_crc16, buf, sizeof(retrieved_crc16));
805    retrieved_crc16 = ntohs(retrieved_crc16);
806    if (computed_crc16 != retrieved_crc16) {
807        throw CRCReadException();
808    }
809
810    memcpy(&items, buf + 2, 2);
811    items = ntohs(items);
812
813    p = p + 4;
814
815    prepItem();
816}
817
818void MutationLog::resetCounts(size_t *items) {
819    for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
820        itemsLogged[i] = items[i];
821    }
822}
823
824// ----------------------------------------------------------------------
825// Reading entries
826// ----------------------------------------------------------------------
827
828bool MutationLogHarvester::load() {
829    bool clean(false);
830    std::set<uint16_t> shouldClear;
831    for (MutationLog::iterator it(mlog.begin()); it != mlog.end(); ++it) {
832        const MutationLogEntry *le = *it;
833        ++itemsSeen[le->type()];
834        clean = false;
835
836        switch (le->type()) {
837        case ML_DEL:
838            // FALLTHROUGH
839        case ML_NEW:
840            if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
841                loading[le->vbucket()][le->key()] =
842                                      std::make_pair(le->rowid(), le->type());
843            }
844            break;
845        case ML_COMMIT2: {
846            clean = true;
847            for (std::set<uint16_t>::iterator vit(shouldClear.begin());
848                 vit != shouldClear.end(); ++vit) {
849                committed[*vit].clear();
850            }
851            shouldClear.clear();
852
853            for (std::set<uint16_t>::const_iterator vit = vbid_set.begin();
854                 vit != vbid_set.end(); ++vit) {
855                uint16_t vb(*vit);
856
857                unordered_map<std::string, mutation_log_event_t>::iterator
858                              copyit2;
859                for (copyit2 = loading[vb].begin();
860                     copyit2 != loading[vb].end();
861                     ++copyit2) {
862
863                    mutation_log_event_t t = copyit2->second;
864
865                    switch (t.second) {
866                    case ML_NEW:
867                        committed[vb][copyit2->first] = t.first;
868                        break;
869                    case ML_DEL:
870                        committed[vb].erase(copyit2->first);
871                        break;
872                    default:
873                        abort();
874                    }
875                }
876            }
877        }
878            loading.clear();
879            break;
880        case ML_COMMIT1:
881            // nothing in particular
882            break;
883        case ML_DEL_ALL:
884            if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
885                loading[le->vbucket()].clear();
886                shouldClear.insert(le->vbucket());
887            }
888            break;
889        default:
890            abort();
891        }
892    }
893    return clean;
894}
895
896void MutationLogHarvester::apply(void *arg, mlCallback mlc) {
897    for (std::set<uint16_t>::const_iterator it = vbid_set.begin();
898         it != vbid_set.end(); ++it) {
899        uint16_t vb(*it);
900
901        for (unordered_map<std::string, uint64_t>::iterator it2 =
902                                                       committed[vb].begin();
903             it2 != committed[vb].end(); ++it2) {
904            const std::string key(it2->first);
905            uint64_t rowid(it2->second);
906            if (!mlc(arg, vb, key, rowid)) { // Stop loading from an access log
907                return;
908            }
909        }
910    }
911}
912
913void MutationLogHarvester::apply(void *arg, mlCallbackWithQueue mlc) {
914    cb_assert(engine);
915    std::vector<std::pair<std::string, uint64_t> > fetches;
916    std::set<uint16_t>::const_iterator it = vbid_set.begin();
917    for (; it != vbid_set.end(); ++it) {
918        uint16_t vb(*it);
919        RCPtr<VBucket> vbucket = engine->getEpStore()->getVBucket(vb);
920        if (!vbucket) {
921            continue;
922        }
923        unordered_map<std::string, uint64_t>::iterator it2 =
924                                                         committed[vb].begin();
925        for (; it2 != committed[vb].end(); ++it2) {
926            // cannot use rowid from access log, so must read from hashtable
927            std::string key = it2->first;
928            StoredValue *v = NULL;
929            if ((v = vbucket->ht.find(key, false))) {
930                fetches.push_back(std::make_pair(it2->first, v->getBySeqno()));
931            }
932        }
933        if (!mlc(vb, fetches, arg)) {
934            return;
935        }
936        fetches.clear();
937    }
938}
939
940void MutationLogHarvester::getUncommitted(
941                             std::vector<mutation_log_uncommitted_t> &uitems) {
942
943    for (std::set<uint16_t>::const_iterator vit = vbid_set.begin();
944         vit != vbid_set.end(); ++vit) {
945        uint16_t vb(*vit);
946        mutation_log_uncommitted_t leftover;
947        leftover.vbucket = vb;
948
949        unordered_map<std::string, mutation_log_event_t>::iterator copyit2;
950        for (copyit2 = loading[vb].begin();
951             copyit2 != loading[vb].end();
952             ++copyit2) {
953
954            mutation_log_event_t t = copyit2->second;
955            leftover.key = copyit2->first;
956            leftover.rowid = t.first;
957            leftover.type = static_cast<mutation_log_type_t>(t.second);
958
959            uitems.push_back(leftover);
960        }
961    }
962}
963
964size_t MutationLogHarvester::total() {
965    size_t rv(0);
966    for (int i = 0; i < MUTATION_LOG_TYPES; ++i) {
967        rv += itemsSeen[i];
968    }
969    return rv;
970}
971
972// ----------------------------------------------------------------------
973// Output of entries
974// ----------------------------------------------------------------------
975
976std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle) {
977    out << "{MutationLogEntry rowid=" << mle.rowid()
978        << ", vbucket=" << mle.vbucket()
979        << ", magic=0x" << std::hex << static_cast<uint16_t>(mle.magic)
980        << std::dec
981        << ", type=" << logType(mle.type())
982        << ", key=``" << mle.key() << "''";
983    return out;
984}
985