xref: /3.0.3-GA/ep-engine/src/mutation_log.cc (revision 9e11a4ab)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2011 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <sys/stat.h>
21
22#include <algorithm>
23#include <string>
24#include <utility>
25
26extern "C" {
27#include "crc32.h"
28}
29#include "ep_engine.h"
30#include "mutation_log.h"
31
32const char *mutation_log_type_names[] = {
33    "new", "del", "del_all", "commit1", "commit2", NULL
34};
35
36
37#ifdef WIN32
38ssize_t pread(file_handle_t fd, void *buf, size_t nbyte, uint64_t offset)
39{
40    DWORD bytesread;
41    OVERLAPPED winoffs;
42    memset(&winoffs, 0, sizeof(winoffs));
43    winoffs.Offset = offset & 0xFFFFFFFF;
44    winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
45    if (!ReadFile(fd, buf, nbyte, &bytesread, &winoffs)) {
46        /* luckily we don't check errno so we don't need to care about that */
47        return -1;
48    }
49
50    return bytesread;
51}
52
53ssize_t pwrite(file_handle_t fd, const void *buf, size_t nbyte,
54               uint64_t offset)
55{
56    DWORD byteswritten;
57    OVERLAPPED winoffs;
58    memset(&winoffs, 0, sizeof(winoffs));
59    winoffs.Offset = offset & 0xFFFFFFFF;
60    winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
61    if (!WriteFile(fd, buf, nbyte, &byteswritten, &winoffs)) {
62        /* luckily we don't check errno so we don't need to care about that */
63        return -1;
64    }
65
66    return byteswritten;
67}
68
69static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
70                              size_t nbytes) {
71    DWORD byteswritten;
72    if (!WriteFile(fd, buf, nbytes, &byteswritten, NULL)) {
73        /* luckily we don't check errno so we don't need to care about that */
74        return -1;
75    }
76
77    cb_assert(GetLastError() != ERROR_IO_PENDING);
78    return byteswritten;
79}
80
81static inline int doClose(file_handle_t fd) {
82    if (CloseHandle(fd)) {
83        return 0;
84    } else {
85        return -1;
86    }
87}
88
89static inline int doFsync(file_handle_t fd) {
90    if (FlushFileBuffers(fd)) {
91        return 0;
92    } else {
93        return -1;
94    }
95}
96
97static inline std::string getErrorString(void) {
98    std::string ret;
99    char* win_msg = NULL;
100    DWORD err = GetLastError();
101    FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
102        FORMAT_MESSAGE_FROM_SYSTEM |
103        FORMAT_MESSAGE_IGNORE_INSERTS,
104        NULL, err,
105        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
106        (LPTSTR)&win_msg,
107        0, NULL);
108    ret.assign(win_msg);
109    LocalFree(win_msg);
110    return ret;
111}
112
113static int64_t SeekFile(file_handle_t fd, const std::string &fname,
114                        uint64_t offset, bool end)
115{
116    LARGE_INTEGER li;
117    li.QuadPart = offset;
118
119    if (end) {
120        li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_END);
121    } else {
122        li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_BEGIN);
123    }
124
125    if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) {
126        std::stringstream ss;
127        ss << "FATAL: SetFilePointer failed " << fname << ": " <<
128              getErrorString();
129        LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
130        li.QuadPart = -1;
131    }
132
133    return li.QuadPart;
134}
135
136file_handle_t OpenFile(const std::string &fname, std::string &error,
137                       bool rdonly) {
138    file_handle_t fd;
139    if (rdonly) {
140        fd = CreateFile(const_cast<char*>(fname.c_str()),
141            GENERIC_READ,
142            FILE_SHARE_READ | FILE_SHARE_WRITE,
143            NULL,
144            OPEN_EXISTING,
145            FILE_ATTRIBUTE_NORMAL,
146            NULL);
147    } else {
148        fd = CreateFile(const_cast<char*>(fname.c_str()),
149            GENERIC_READ | GENERIC_WRITE,
150            FILE_SHARE_READ | FILE_SHARE_WRITE,
151            NULL,
152            OPEN_ALWAYS,
153            FILE_ATTRIBUTE_NORMAL,
154            NULL);
155    }
156
157    if (fd == INVALID_FILE_VALUE) {
158        error.assign(getErrorString());
159    }
160
161    return fd;
162}
163
164int64_t getFileSize(file_handle_t fd) {
165    LARGE_INTEGER li;
166    if (GetFileSizeEx(fd, &li)) {
167        return li.QuadPart;
168    }
169    abort();
170}
171
172#else
173
174static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
175                              size_t nbytes) {
176    ssize_t ret;
177    while ((ret = write(fd, buf, nbytes)) == -1 && (errno == EINTR)) {
178        /* Retry */
179    }
180    return ret;
181}
182
183static inline int doClose(file_handle_t fd) {
184    int ret;
185    while ((ret = close(fd)) == -1 && (errno == EINTR)) {
186        /* Retry */
187    }
188    return ret;
189}
190
191static inline int doFsync(file_handle_t fd) {
192    int ret;
193    while ((ret = fsync(fd)) == -1 && (errno == EINTR)) {
194        /* Retry */
195    }
196    return ret;
197}
198
199static int64_t SeekFile(file_handle_t fd, const std::string &fname,
200                        uint64_t offset, bool end)
201{
202    int64_t ret;
203    if (end) {
204        ret = lseek(fd, offset, SEEK_END);
205    } else {
206        ret = lseek(fd, offset, SEEK_SET);
207    }
208
209    if (ret < 0) {
210        LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
211            fname.c_str(),
212            strerror(errno));
213    }
214    return ret;
215}
216
217file_handle_t OpenFile(const std::string &fname, std::string &error,
218                       bool rdonly) {
219    file_handle_t fd;
220    if (rdonly) {
221        fd = ::open(const_cast<char*>(fname.c_str()), O_RDONLY);
222    } else {
223        fd = ::open(const_cast<char*>(fname.c_str()), O_RDWR | O_CREAT, 0666);
224    }
225
226    if (fd < 0) {
227        error.assign(strerror(errno));
228    }
229
230    return fd;
231}
232
233int64_t getFileSize(file_handle_t fd) {
234    struct stat st;
235    int stat_result = fstat(fd, &st);
236    cb_assert(stat_result == 0);
237    return st.st_size;
238}
239#endif
240
241
242static void writeFully(file_handle_t fd, const uint8_t *buf, size_t nbytes) {
243    while (nbytes > 0) {
244        ssize_t written = doWrite(fd, buf, nbytes);
245        cb_assert(written >= 0);
246
247        nbytes -= written;
248        buf += written;
249    }
250}
251
252uint64_t MutationLogEntry::rowid() const {
253    return ntohll(_rowid);
254}
255
256MutationLog::MutationLog(const std::string &path,
257                         const size_t bs)
258    : paddingHisto(GrowingWidthGenerator<uint32_t>(0, 8, 1.5), 32),
259    logPath(path),
260    blockSize(bs),
261    blockPos(HEADER_RESERVED),
262    file(INVALID_FILE_VALUE),
263    disabled(false),
264    entries(0),
265    entryBuffer(static_cast<uint8_t*>(calloc(MutationLogEntry::len(256), 1))),
266    blockBuffer(static_cast<uint8_t*>(calloc(bs, 1))),
267    syncConfig(DEFAULT_SYNC_CONF),
268    readOnly(false)
269{
270    for (int ii = 0; ii < MUTATION_LOG_TYPES; ++ii) {
271        itemsLogged[ii].store(0);
272    }
273    logSize.store(0);
274
275    cb_assert(entryBuffer);
276    cb_assert(blockBuffer);
277    if (logPath == "") {
278        disabled = true;
279    }
280}
281
282MutationLog::~MutationLog() {
283    flush();
284    close();
285    free(entryBuffer);
286    free(blockBuffer);
287}
288
289void MutationLog::disable() {
290    if (file >= 0) {
291        close();
292        disabled = true;
293    }
294}
295
296void MutationLog::newItem(uint16_t vbucket, const std::string &key,
297                          uint64_t rowid) {
298    if (isEnabled()) {
299        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
300                                                           rowid, ML_NEW,
301                                                           vbucket, key);
302        writeEntry(mle);
303    }
304}
305
306void MutationLog::delItem(uint16_t vbucket, const std::string &key) {
307    if (isEnabled()) {
308        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
309                                                           0, ML_DEL, vbucket,
310                                                           key);
311        writeEntry(mle);
312    }
313}
314
315void MutationLog::deleteAll(uint16_t vbucket) {
316    if (isEnabled()) {
317        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
318                                                           0, ML_DEL_ALL,
319                                                           vbucket, "");
320        writeEntry(mle);
321    }
322}
323
324void MutationLog::sync() {
325    cb_assert(isOpen());
326    BlockTimer timer(&syncTimeHisto);
327    int fsyncResult = doFsync(file);
328    cb_assert(fsyncResult != -1);
329}
330
331void MutationLog::commit1() {
332    if (isEnabled()) {
333        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
334                                                           0, ML_COMMIT1, 0,
335                                                           "");
336        writeEntry(mle);
337        if ((getSyncConfig() & FLUSH_COMMIT_1) != 0) {
338            flush();
339        }
340        if ((getSyncConfig() & SYNC_COMMIT_1) != 0) {
341            sync();
342        }
343    }
344}
345
346void MutationLog::commit2() {
347    if (isEnabled()) {
348        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
349                                                           0, ML_COMMIT2, 0,
350                                                           "");
351        writeEntry(mle);
352        if ((getSyncConfig() & FLUSH_COMMIT_2) != 0) {
353            flush();
354        }
355        if ((getSyncConfig() & SYNC_COMMIT_2) != 0) {
356            sync();
357        }
358    }
359}
360
361bool MutationLog::writeInitialBlock() {
362    cb_assert(!readOnly);
363    cb_assert(isEnabled());
364    cb_assert(isOpen());
365    headerBlock.set(blockSize);
366
367    writeFully(file, (uint8_t*)&headerBlock, sizeof(headerBlock));
368
369    int64_t seek_result = SeekFile(file, getLogFile(),
370                            std::max(
371                            static_cast<uint32_t>(MIN_LOG_HEADER_SIZE),
372                            headerBlock.blockSize() * headerBlock.blockCount())
373                             - 1, false);
374    if (seek_result < 0) {
375        LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
376            getLogFile().c_str(), strerror(errno));
377        return false;
378    }
379    uint8_t zero(0);
380    writeFully(file, &zero, sizeof(zero));
381    return true;
382}
383
384void MutationLog::readInitialBlock() {
385    cb_assert(isOpen());
386    uint8_t buf[MIN_LOG_HEADER_SIZE];
387    ssize_t bytesread = pread(file, buf, sizeof(buf), 0);
388
389    if (bytesread != sizeof(buf)) {
390        LOG(EXTENSION_LOG_WARNING, "FATAL: initial block read failed"
391                "'%s': %s", getLogFile().c_str(), strerror(errno));
392        throw ShortReadException();
393    }
394
395    headerBlock.set(buf, sizeof(buf));
396
397    // These are reserved for future use.
398    if (headerBlock.version() != LOG_VERSION ||
399            headerBlock.blockCount() != 1) {
400        std::stringstream ss;
401        ss << "HeaderBlock version/blockCount mismatch";
402        throw ReadException(ss.str());
403    }
404
405    blockSize = headerBlock.blockSize();
406}
407
408void MutationLog::updateInitialBlock() {
409    cb_assert(!readOnly);
410    cb_assert(isOpen());
411    needWriteAccess();
412
413    uint8_t buf[MIN_LOG_HEADER_SIZE];
414    memset(buf, 0, sizeof(buf));
415    memcpy(buf, (uint8_t*)&headerBlock, sizeof(headerBlock));
416
417    ssize_t byteswritten = pwrite(file, buf, sizeof(buf), 0);
418
419    // @todo we need a write exception
420    if (byteswritten != sizeof(buf)) {
421        throw WriteException("Failed to update header block");
422    }
423}
424
425bool MutationLog::prepareWrites() {
426    if (isEnabled()) {
427        cb_assert(isOpen());
428        int64_t seek_result = SeekFile(file, getLogFile(), 0, true);
429        if (seek_result < 0) {
430            return false;
431        }
432        int64_t unaligned_bytes = seek_result % blockSize;
433        if (unaligned_bytes != 0) {
434            LOG(EXTENSION_LOG_WARNING,
435                    "WARNING: filesize %d not block aligned", seek_result,
436                    "'%s': %s", getLogFile().c_str(), strerror(errno));
437            if (blockSize < seek_result) {
438                if (SeekFile(file, getLogFile(),
439                    seek_result - unaligned_bytes, false) < 0) {
440                    return false;
441                }
442            } else {
443                throw ShortReadException();
444            }
445        }
446        logSize = static_cast<size_t>(seek_result);
447    }
448    return true;
449}
450
451static uint8_t parseConfigString(const std::string &s) {
452    uint8_t rv(0);
453    if (s == "off") {
454        rv = 0;
455    } else if (s == "commit1") {
456        rv = 1;
457    } else if (s == "commit2") {
458        rv = 2;
459    } else if (s == "full") {
460        rv = 3;
461    } else {
462        rv = 0xff;
463    }
464    return rv;
465}
466
467bool MutationLog::setSyncConfig(const std::string &s) {
468    uint8_t v(parseConfigString(s));
469    if (v != 0xff) {
470        syncConfig = (syncConfig & ~SYNC_FULL) | v;
471    }
472    return v != 0xff;
473}
474
475bool MutationLog::setFlushConfig(const std::string &s) {
476    uint8_t v(parseConfigString(s));
477    if (v != 0xff) {
478        syncConfig = (syncConfig & ~FLUSH_FULL) | (v << 2);
479    }
480    return v != 0xff;
481}
482
483bool MutationLog::exists() const {
484    return access(logPath.c_str(), F_OK) == 0;
485}
486
487void MutationLog::open(bool _readOnly) {
488    if (!isEnabled()) {
489        return;
490    }
491    readOnly = _readOnly;
492    std::string error;
493    if (readOnly) {
494        if (!exists()) {
495            throw FileNotFoundException(logPath);
496        }
497        file = OpenFile(logPath, error, true);
498    } else {
499        file = OpenFile(logPath, error, false);
500    }
501
502    if (file == INVALID_FILE_VALUE) {
503        std::stringstream ss;
504        ss << "Unable to open log file: " << error; // strerror(errno);
505        throw ReadException(ss.str());
506    }
507
508    int64_t size = getFileSize(file);
509    if (size && size < static_cast<int64_t>(MIN_LOG_HEADER_SIZE)) {
510        try {
511            LOG(EXTENSION_LOG_WARNING, "WARNING: Corrupted access log '%s'",
512                    getLogFile().c_str());
513            reset();
514            return;
515        } catch (ShortReadException &) {
516            close();
517            disabled = true;
518            throw ShortReadException();
519        }
520    }
521    if (size == 0) {
522        if (!writeInitialBlock()) {
523            close();
524            disabled = true;
525            return;
526        }
527    } else {
528        try {
529            readInitialBlock();
530        } catch (ShortReadException &) {
531            close();
532            disabled = true;
533            throw ShortReadException();
534        }
535
536        if (!readOnly) {
537            headerBlock.setRdwr(1);
538            updateInitialBlock();
539        }
540    }
541
542    if (!prepareWrites()) {
543        close();
544        disabled = true;
545        return;
546    }
547
548    cb_assert(isOpen());
549}
550
551void MutationLog::close() {
552    if (!isEnabled() || !isOpen()) {
553        return;
554    }
555
556    if (!readOnly) {
557        flush();
558        sync();
559        headerBlock.setRdwr(0);
560        updateInitialBlock();
561    }
562
563    int close_result = doClose(file);
564    cb_assert(close_result != -1);
565    file = INVALID_FILE_VALUE;
566}
567
568bool MutationLog::reset() {
569    if (!isEnabled()) {
570        return false;
571    }
572    close();
573
574    if (remove(getLogFile().c_str()) == -1) {
575        LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
576            getLogFile().c_str(), strerror(errno));
577        return false;
578    }
579
580    open();
581    LOG(EXTENSION_LOG_INFO, "Reset a mutation log '%s' successfully.",
582        getLogFile().c_str());
583    return true;
584}
585
586bool MutationLog::replaceWith(MutationLog &mlog) {
587    cb_assert(mlog.isEnabled());
588    cb_assert(isEnabled());
589
590    mlog.flush();
591    mlog.close();
592    flush();
593    close();
594
595    for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
596        itemsLogged[i].store(mlog.itemsLogged[i]);
597    }
598
599    if (rename(mlog.getLogFile().c_str(), getLogFile().c_str()) != 0) {
600        open();
601        std::stringstream ss;
602        ss <<
603        "Unable to rename a mutation log \"" << mlog.getLogFile() << "\" "
604           << "to \"" << getLogFile() << "\": " << strerror(errno);
605        LOG(EXTENSION_LOG_WARNING, "%s!!! Reopened the old log file",
606            ss.str().c_str());
607        return false;
608    }
609
610    open();
611    LOG(EXTENSION_LOG_INFO,
612        "Renamed a mutation log \"%s\" to \"%s\" and reopened it",
613        mlog.getLogFile().c_str(), getLogFile().c_str());
614    return true;
615}
616
617void MutationLog::flush() {
618    if (isEnabled() && blockPos > HEADER_RESERVED) {
619        cb_assert(isOpen());
620        needWriteAccess();
621        BlockTimer timer(&flushTimeHisto);
622
623        if (blockPos < blockSize) {
624            size_t padding(blockSize - blockPos);
625            memset(blockBuffer + blockPos, 0x00, padding);
626            paddingHisto.add(padding);
627        }
628
629        entries = htons(entries);
630        memcpy(blockBuffer + 2, &entries, sizeof(entries));
631
632        uint32_t crc32(crc32buf(blockBuffer + 2, blockSize - 2));
633        uint16_t crc16(htons(crc32 & 0xffff));
634        memcpy(blockBuffer, &crc16, sizeof(crc16));
635
636        writeFully(file, blockBuffer, blockSize);
637        logSize.fetch_add(blockSize);
638
639        blockPos = HEADER_RESERVED;
640        entries = 0;
641    }
642}
643
644void MutationLog::writeEntry(MutationLogEntry *mle) {
645    cb_assert(isEnabled());
646    cb_assert(isOpen());
647    needWriteAccess();
648
649    size_t len(mle->len());
650    if (blockPos + len > blockSize) {
651        flush();
652    }
653    cb_assert(len < blockSize);
654
655    memcpy(blockBuffer + blockPos, mle, len);
656    blockPos += len;
657    ++entries;
658
659    ++itemsLogged[mle->type()];
660
661    delete mle;
662}
663
664static const char* logType(uint8_t t) {
665    switch(t) {
666    case ML_NEW:
667        return "new";
668        break;
669    case ML_DEL:
670        return "del";
671        break;
672    case ML_DEL_ALL:
673        return "delall";
674        break;
675    case ML_COMMIT1:
676        return "commit1";
677        break;
678    case ML_COMMIT2:
679        return "commit2";
680        break;
681    }
682    return "UNKNOWN";
683}
684
685// ----------------------------------------------------------------------
686// Mutation log iterator
687// ----------------------------------------------------------------------
688
689MutationLog::iterator::iterator(const MutationLog *l, bool e)
690  : log(l),
691    entryBuf(NULL),
692    buf(NULL),
693    p(buf),
694    offset(l->header().blockSize() * l->header().blockCount()),
695    items(0),
696    isEnd(e)
697{
698    cb_assert(log);
699}
700
701MutationLog::iterator::iterator(const MutationLog::iterator& mit)
702  : log(mit.log),
703    entryBuf(NULL),
704    buf(NULL),
705    p(NULL),
706    offset(mit.offset),
707    items(mit.items),
708    isEnd(mit.isEnd)
709{
710    cb_assert(log);
711    if (mit.buf != NULL) {
712        buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize()));
713        cb_assert(buf);
714        memcpy(buf, mit.buf, log->header().blockSize());
715        p = buf + (mit.p - mit.buf);
716    }
717
718    if (mit.entryBuf != NULL) {
719        entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE));
720        cb_assert(entryBuf);
721        memcpy(entryBuf, mit.entryBuf, LOG_ENTRY_BUF_SIZE);
722    }
723}
724
725MutationLog::iterator::~iterator() {
726    free(entryBuf);
727    free(buf);
728}
729
730void MutationLog::iterator::prepItem() {
731    MutationLogEntry *e = MutationLogEntry::newEntry(p,
732                                                     bufferBytesRemaining());
733    if (entryBuf == NULL) {
734        entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE));
735        cb_assert(entryBuf);
736    }
737    memcpy(entryBuf, p, e->len());
738}
739
740MutationLog::iterator& MutationLog::iterator::operator++() {
741    if (--items == 0) {
742        nextBlock();
743    } else {
744        size_t l(operator*()->len());
745        p += l;
746
747        prepItem();
748    }
749    return *this;
750}
751
752MutationLog::iterator& MutationLog::iterator::operator++(int) {
753    abort();
754    return *this;
755}
756
757bool MutationLog::iterator::operator==(const MutationLog::iterator& rhs) {
758    return log->fd() == rhs.log->fd()
759        && (
760            (isEnd == rhs.isEnd)
761            || (offset == rhs.offset
762                && items == rhs.items));
763}
764
765bool MutationLog::iterator::operator!=(const MutationLog::iterator& rhs) {
766    return ! operator==(rhs);
767}
768
769const MutationLogEntry* MutationLog::iterator::operator*() {
770    cb_assert(entryBuf != NULL);
771    return MutationLogEntry::newEntry(entryBuf, LOG_ENTRY_BUF_SIZE);
772}
773
774size_t MutationLog::iterator::bufferBytesRemaining() {
775    return log->header().blockSize() - (p - buf);
776}
777
778void MutationLog::iterator::nextBlock() {
779    cb_assert(!log->isEnabled() || log->isOpen());
780    if (buf == NULL) {
781        buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize()));
782        cb_assert(buf);
783    }
784    p = buf;
785
786    ssize_t bytesread = pread(log->fd(), buf, log->header().blockSize(),
787                              offset);
788    if (bytesread < 1) {
789        isEnd = true;
790        return;
791    }
792    if (bytesread != (ssize_t)(log->header().blockSize())) {
793        LOG(EXTENSION_LOG_WARNING, "FATAL: too few bytes read in access log"
794                "'%s': %s", log->getLogFile().c_str(), strerror(errno));
795        throw ShortReadException();
796    }
797    offset += bytesread;
798
799    uint32_t crc32(crc32buf(buf + 2, log->header().blockSize() - 2));
800    uint16_t computed_crc16(crc32 & 0xffff);
801    uint16_t retrieved_crc16;
802    memcpy(&retrieved_crc16, buf, sizeof(retrieved_crc16));
803    retrieved_crc16 = ntohs(retrieved_crc16);
804    if (computed_crc16 != retrieved_crc16) {
805        throw CRCReadException();
806    }
807
808    memcpy(&items, buf + 2, 2);
809    items = ntohs(items);
810
811    p = p + 4;
812
813    prepItem();
814}
815
816void MutationLog::resetCounts(size_t *items) {
817    for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
818        itemsLogged[i] = items[i];
819    }
820}
821
822// ----------------------------------------------------------------------
823// Reading entries
824// ----------------------------------------------------------------------
825
826bool MutationLogHarvester::load() {
827    bool clean(false);
828    std::set<uint16_t> shouldClear;
829    for (MutationLog::iterator it(mlog.begin()); it != mlog.end(); ++it) {
830        const MutationLogEntry *le = *it;
831        ++itemsSeen[le->type()];
832        clean = false;
833
834        switch (le->type()) {
835        case ML_DEL:
836            // FALLTHROUGH
837        case ML_NEW:
838            if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
839                loading[le->vbucket()][le->key()] =
840                                      std::make_pair(le->rowid(), le->type());
841            }
842            break;
843        case ML_COMMIT2: {
844            clean = true;
845            for (std::set<uint16_t>::iterator vit(shouldClear.begin());
846                 vit != shouldClear.end(); ++vit) {
847                committed[*vit].clear();
848            }
849            shouldClear.clear();
850
851            for (std::set<uint16_t>::const_iterator vit = vbid_set.begin();
852                 vit != vbid_set.end(); ++vit) {
853                uint16_t vb(*vit);
854
855                unordered_map<std::string, mutation_log_event_t>::iterator
856                              copyit2;
857                for (copyit2 = loading[vb].begin();
858                     copyit2 != loading[vb].end();
859                     ++copyit2) {
860
861                    mutation_log_event_t t = copyit2->second;
862
863                    switch (t.second) {
864                    case ML_NEW:
865                        committed[vb][copyit2->first] = t.first;
866                        break;
867                    case ML_DEL:
868                        committed[vb].erase(copyit2->first);
869                        break;
870                    default:
871                        abort();
872                    }
873                }
874            }
875        }
876            loading.clear();
877            break;
878        case ML_COMMIT1:
879            // nothing in particular
880            break;
881        case ML_DEL_ALL:
882            if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
883                loading[le->vbucket()].clear();
884                shouldClear.insert(le->vbucket());
885            }
886            break;
887        default:
888            abort();
889        }
890    }
891    return clean;
892}
893
894void MutationLogHarvester::apply(void *arg, mlCallback mlc) {
895    for (std::set<uint16_t>::const_iterator it = vbid_set.begin();
896         it != vbid_set.end(); ++it) {
897        uint16_t vb(*it);
898
899        for (unordered_map<std::string, uint64_t>::iterator it2 =
900                                                       committed[vb].begin();
901             it2 != committed[vb].end(); ++it2) {
902            const std::string key(it2->first);
903            uint64_t rowid(it2->second);
904            if (!mlc(arg, vb, key, rowid)) { // Stop loading from an access log
905                return;
906            }
907        }
908    }
909}
910
911void MutationLogHarvester::apply(void *arg, mlCallbackWithQueue mlc) {
912    cb_assert(engine);
913    std::vector<std::pair<std::string, uint64_t> > fetches;
914    std::set<uint16_t>::const_iterator it = vbid_set.begin();
915    for (; it != vbid_set.end(); ++it) {
916        uint16_t vb(*it);
917        RCPtr<VBucket> vbucket = engine->getEpStore()->getVBucket(vb);
918        if (!vbucket) {
919            continue;
920        }
921        unordered_map<std::string, uint64_t>::iterator it2 =
922                                                         committed[vb].begin();
923        for (; it2 != committed[vb].end(); ++it2) {
924            // cannot use rowid from access log, so must read from hashtable
925            std::string key = it2->first;
926            StoredValue *v = NULL;
927            if ((v = vbucket->ht.find(key, false))) {
928                fetches.push_back(std::make_pair(it2->first, v->getBySeqno()));
929            }
930        }
931        if (!mlc(vb, fetches, arg)) {
932            return;
933        }
934        fetches.clear();
935    }
936}
937
938void MutationLogHarvester::getUncommitted(
939                             std::vector<mutation_log_uncommitted_t> &uitems) {
940
941    for (std::set<uint16_t>::const_iterator vit = vbid_set.begin();
942         vit != vbid_set.end(); ++vit) {
943        uint16_t vb(*vit);
944        mutation_log_uncommitted_t leftover;
945        leftover.vbucket = vb;
946
947        unordered_map<std::string, mutation_log_event_t>::iterator copyit2;
948        for (copyit2 = loading[vb].begin();
949             copyit2 != loading[vb].end();
950             ++copyit2) {
951
952            mutation_log_event_t t = copyit2->second;
953            leftover.key = copyit2->first;
954            leftover.rowid = t.first;
955            leftover.type = static_cast<mutation_log_type_t>(t.second);
956
957            uitems.push_back(leftover);
958        }
959    }
960}
961
962size_t MutationLogHarvester::total() {
963    size_t rv(0);
964    for (int i = 0; i < MUTATION_LOG_TYPES; ++i) {
965        rv += itemsSeen[i];
966    }
967    return rv;
968}
969
970// ----------------------------------------------------------------------
971// Output of entries
972// ----------------------------------------------------------------------
973
974std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle) {
975    out << "{MutationLogEntry rowid=" << mle.rowid()
976        << ", vbucket=" << mle.vbucket()
977        << ", magic=0x" << std::hex << static_cast<uint16_t>(mle.magic)
978        << std::dec
979        << ", type=" << logType(mle.type())
980        << ", key=``" << mle.key() << "''";
981    return out;
982}
983