xref: /4.6.4/ep-engine/src/mutation_log.cc (revision 0a68c879)
1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "config.h"
19
20#include <algorithm>
21#include <fcntl.h>
22#include <platform/strerror.h>
23#include <string>
24#include <sys/stat.h>
25#include <system_error>
26#include <utility>
27
28extern "C" {
29#include "crc32.h"
30}
31#include "ep_engine.h"
32#include "mutation_log.h"
33
34const char *mutation_log_type_names[] = {
35    "new", "del", "del_all", "commit1", "commit2", NULL
36};
37
38#ifdef WIN32
39ssize_t pread(file_handle_t fd, void *buf, size_t nbyte, uint64_t offset)
40{
41    DWORD bytesread;
42    OVERLAPPED winoffs;
43    memset(&winoffs, 0, sizeof(winoffs));
44    winoffs.Offset = offset & 0xFFFFFFFF;
45    winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
46    if (!ReadFile(fd, buf, nbyte, &bytesread, &winoffs)) {
47        /* luckily we don't check errno so we don't need to care about that */
48        return -1;
49    }
50
51    return bytesread;
52}
53
54ssize_t pwrite(file_handle_t fd, const void *buf, size_t nbyte,
55               uint64_t offset)
56{
57    DWORD byteswritten;
58    OVERLAPPED winoffs;
59    memset(&winoffs, 0, sizeof(winoffs));
60    winoffs.Offset = offset & 0xFFFFFFFF;
61    winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
62    if (!WriteFile(fd, buf, nbyte, &byteswritten, &winoffs)) {
63        /* luckily we don't check errno so we don't need to care about that */
64        return -1;
65    }
66
67    return byteswritten;
68}
69
70static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
71                              size_t nbytes) {
72    DWORD byteswritten;
73    if (!WriteFile(fd, buf, nbytes, &byteswritten, NULL)) {
74        /* luckily we don't check errno so we don't need to care about that */
75        throw std::system_error(GetLastError(), std::system_category(),
76                                "doWrite: failed");
77    }
78    return byteswritten;
79}
80
81static inline void doClose(file_handle_t fd) {
82    if (!CloseHandle(fd)) {
83        throw std::system_error(GetLastError(), std::system_category(),
84                                "doClose: failed");
85    }
86}
87
88static inline void doFsync(file_handle_t fd) {
89    if (!FlushFileBuffers(fd)) {
90        throw std::system_error(GetLastError(), std::system_category(),
91                                "doFsync: failed");
92    }
93}
94
95static int64_t SeekFile(file_handle_t fd, const std::string &fname,
96                        uint64_t offset, bool end)
97{
98    LARGE_INTEGER li;
99    li.QuadPart = offset;
100
101    if (end) {
102        li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_END);
103    } else {
104        li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_BEGIN);
105    }
106
107    if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) {
108        std::stringstream ss;
109        ss << "FATAL: SetFilePointer failed " << fname << ": " <<
110              cb_strerror();
111        LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
112        li.QuadPart = -1;
113    }
114
115    return li.QuadPart;
116}
117
118file_handle_t OpenFile(const std::string &fname, std::string &error,
119                       bool rdonly) {
120    file_handle_t fd;
121    if (rdonly) {
122        fd = CreateFile(const_cast<char*>(fname.c_str()),
123            GENERIC_READ,
124            FILE_SHARE_READ | FILE_SHARE_WRITE,
125            NULL,
126            OPEN_EXISTING,
127            FILE_ATTRIBUTE_NORMAL,
128            NULL);
129    } else {
130        fd = CreateFile(const_cast<char*>(fname.c_str()),
131            GENERIC_READ | GENERIC_WRITE,
132            FILE_SHARE_READ | FILE_SHARE_WRITE,
133            NULL,
134            OPEN_ALWAYS,
135            FILE_ATTRIBUTE_NORMAL,
136            NULL);
137    }
138
139    if (fd == INVALID_FILE_VALUE) {
140        error.assign(cb_strerror());
141    }
142
143    return fd;
144}
145
146int64_t getFileSize(file_handle_t fd) {
147    LARGE_INTEGER li;
148    if (GetFileSizeEx(fd, &li)) {
149        return li.QuadPart;
150    }
151    throw std::system_error(GetLastError(), std::system_category(),
152                            "getFileSize: failed");
153}
154
155#else
156
157static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
158                              size_t nbytes) {
159    ssize_t ret;
160    while ((ret = write(fd, buf, nbytes)) == -1 && (errno == EINTR)) {
161        /* Retry */
162    }
163    if (ret == -1) {
164        // Non EINTR error
165        throw std::system_error(errno, std::system_category(),
166                                "doWrite: failed");
167    }
168    return ret;
169}
170
171static inline void doClose(file_handle_t fd) {
172    int ret;
173    while ((ret = close(fd)) == -1 && (errno == EINTR)) {
174        /* Retry */
175    }
176    if (ret == -1) {
177        // Non EINTR error
178        throw std::system_error(errno, std::system_category(),
179                                "doClose: failed");
180    }
181}
182
183static inline void doFsync(file_handle_t fd) {
184    int ret;
185    while ((ret = fsync(fd)) == -1 && (errno == EINTR)) {
186        /* Retry */
187    }
188    if (ret == -1) {
189        // Non EINTR error
190        throw std::system_error(errno, std::system_category(),
191                                "doFsync: failed");
192    }
193}
194
195static int64_t SeekFile(file_handle_t fd, const std::string &fname,
196                        uint64_t offset, bool end)
197{
198    int64_t ret;
199    if (end) {
200        ret = lseek(fd, offset, SEEK_END);
201    } else {
202        ret = lseek(fd, offset, SEEK_SET);
203    }
204
205    if (ret < 0) {
206        LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
207            fname.c_str(),
208            strerror(errno));
209    }
210    return ret;
211}
212
213file_handle_t OpenFile(const std::string &fname, std::string &error,
214                       bool rdonly) {
215    file_handle_t fd;
216    if (rdonly) {
217        fd = ::open(const_cast<char*>(fname.c_str()), O_RDONLY);
218    } else {
219        fd = ::open(const_cast<char*>(fname.c_str()), O_RDWR | O_CREAT, 0666);
220    }
221
222    if (fd < 0) {
223        error.assign(strerror(errno));
224    }
225
226    return fd;
227}
228
229int64_t getFileSize(file_handle_t fd) {
230    struct stat st;
231    int stat_result = fstat(fd, &st);
232    if (stat_result != 0) {
233        throw std::system_error(errno, std::system_category(),
234                                "getFileSize: failed");
235    }
236    return st.st_size;
237}
238#endif
239
240
241static bool writeFully(file_handle_t fd, const uint8_t *buf, size_t nbytes) {
242    while (nbytes > 0) {
243        try {
244            ssize_t written = doWrite(fd, buf, nbytes);
245            nbytes -= written;
246            buf += written;
247        } catch (std::system_error& e) {
248            LOG(EXTENSION_LOG_WARNING,
249                "writeFully: Failed to write to mutation log with error: %s",
250                cb_strerror().c_str());
251
252            return false;
253        }
254    }
255
256    return true;
257}
258
259uint64_t MutationLogEntry::rowid() const {
260    return ntohll(_rowid);
261}
262
263MutationLog::MutationLog(const std::string &path,
264                         const size_t bs)
265    : paddingHisto(GrowingWidthGenerator<uint32_t>(0, 8, 1.5), 32),
266    logPath(path),
267    blockSize(bs),
268    blockPos(HEADER_RESERVED),
269    file(INVALID_FILE_VALUE),
270    disabled(false),
271    entries(0),
272    entryBuffer(new uint8_t[MutationLogEntry::len(256)]()),
273    blockBuffer(new uint8_t[bs]()),
274    syncConfig(DEFAULT_SYNC_CONF),
275    readOnly(false)
276{
277    for (int ii = 0; ii < MUTATION_LOG_TYPES; ++ii) {
278        itemsLogged[ii].store(0);
279    }
280    logSize.store(0);
281
282    if (logPath == "") {
283        disabled = true;
284    }
285}
286
287MutationLog::~MutationLog() {
288    std::stringstream ss;
289    ss << *this;
290    LOG(EXTENSION_LOG_NOTICE, "%s", ss.str().c_str());
291    LOG(EXTENSION_LOG_NOTICE, "MutationLog::~MutationLog flush");
292    flush();
293    LOG(EXTENSION_LOG_NOTICE, "MutationLog::~MutationLog close");
294    close();
295    LOG(EXTENSION_LOG_NOTICE, "MutationLog::~MutationLog done");
296}
297
298void MutationLog::disable() {
299    if (file >= 0) {
300        close();
301        disabled = true;
302    }
303}
304
305void MutationLog::newItem(uint16_t vbucket, const std::string &key,
306                          uint64_t rowid) {
307    if (isEnabled()) {
308        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer.get(),
309                                                           rowid, ML_NEW,
310                                                           vbucket, key);
311        writeEntry(mle);
312    }
313}
314
315void MutationLog::sync() {
316    if (!isOpen()) {
317        throw std::logic_error("MutationLog::sync: Not valid on a closed log");
318    }
319
320    BlockTimer timer(&syncTimeHisto);
321    try {
322        doFsync(file);
323    } catch (std::system_error& e) {
324        throw WriteException(e.what());
325    }
326}
327
328void MutationLog::commit1() {
329    if (isEnabled()) {
330        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer.get(),
331                                                           0, ML_COMMIT1, 0,
332                                                           "");
333        writeEntry(mle);
334
335        if ((getSyncConfig() & FLUSH_COMMIT_1) != 0) {
336            flush();
337        }
338        if ((getSyncConfig() & SYNC_COMMIT_1) != 0) {
339            sync();
340        }
341    }
342}
343
344void MutationLog::commit2() {
345    if (isEnabled()) {
346        MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer.get(),
347                                                           0, ML_COMMIT2, 0,
348                                                           "");
349        writeEntry(mle);
350
351        if ((getSyncConfig() & FLUSH_COMMIT_2) != 0) {
352            flush();
353        }
354        if ((getSyncConfig() & SYNC_COMMIT_2) != 0) {
355            sync();
356        }
357    }
358}
359
360bool MutationLog::writeInitialBlock() {
361    if (readOnly) {
362        throw std::logic_error("MutationLog::writeInitialBlock: Not valid on "
363                               "a read-only log");
364    }
365    if (!isEnabled()) {
366        throw std::logic_error("MutationLog::writeInitialBlock: Not valid on "
367                               "a disabled log");
368    }
369    if (!isOpen()) {
370        throw std::logic_error("MutationLog::writeInitialBlock: Not valid on "
371                               "a closed log");
372    }
373    headerBlock.set(blockSize);
374
375    if (!writeFully(file, (uint8_t*)&headerBlock, sizeof(headerBlock))) {
376        return false;
377    }
378
379    int64_t seek_result = SeekFile(file, getLogFile(),
380                            std::max(
381                            static_cast<uint32_t>(MIN_LOG_HEADER_SIZE),
382                            headerBlock.blockSize() * headerBlock.blockCount())
383                             - 1, false);
384    if (seek_result < 0) {
385        LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
386            getLogFile().c_str(), strerror(errno));
387        return false;
388    }
389
390    uint8_t zero(0);
391    if (!writeFully(file, &zero, sizeof(zero))) {
392        return false;
393    }
394    return true;
395}
396
397void MutationLog::readInitialBlock() {
398    if (!isOpen()) {
399        throw std::logic_error("MutationLog::readInitialBlock: Not valid on "
400                               "a closed log");
401    }
402    std::array<uint8_t, MIN_LOG_HEADER_SIZE> buf;
403    ssize_t bytesread = pread(file, buf.data(), sizeof(buf), 0);
404
405    if (bytesread != sizeof(buf)) {
406        LOG(EXTENSION_LOG_WARNING, "FATAL: initial block read failed"
407                "'%s': %s", getLogFile().c_str(), strerror(errno));
408        throw ShortReadException();
409    }
410
411    headerBlock.set(buf);
412
413    // These are reserved for future use.
414    if (headerBlock.version() != LOG_VERSION ||
415            headerBlock.blockCount() != 1) {
416        std::stringstream ss;
417        ss << "HeaderBlock version/blockCount mismatch";
418        throw ReadException(ss.str());
419    }
420
421    blockSize = headerBlock.blockSize();
422}
423
424void MutationLog::updateInitialBlock() {
425    if (readOnly) {
426        throw std::logic_error("MutationLog::updateInitialBlock: Not valid on "
427                               "a read-only log");
428    }
429    if (!isOpen()) {
430        throw std::logic_error("MutationLog::updateInitialBlock: Not valid on "
431                               "a closed log");
432    }
433    needWriteAccess();
434
435    uint8_t buf[MIN_LOG_HEADER_SIZE];
436    memset(buf, 0, sizeof(buf));
437    memcpy(buf, (uint8_t*)&headerBlock, sizeof(headerBlock));
438
439    ssize_t byteswritten = pwrite(file, buf, sizeof(buf), 0);
440
441    // @todo we need a write exception
442    if (byteswritten != sizeof(buf)) {
443        throw WriteException("Failed to update header block");
444    }
445}
446
447bool MutationLog::prepareWrites() {
448    if (isEnabled()) {
449        if (!isOpen()) {
450            throw std::logic_error("MutationLog::prepareWrites: Not valid on "
451                                   "a closed log");
452        }
453        int64_t seek_result = SeekFile(file, getLogFile(), 0, true);
454        if (seek_result < 0) {
455            return false;
456        }
457        int64_t unaligned_bytes = seek_result % blockSize;
458        if (unaligned_bytes != 0) {
459            LOG(EXTENSION_LOG_WARNING,
460                "WARNING: filesize %" PRId64 " not block aligned '%s': %s",
461                seek_result, getLogFile().c_str(), strerror(errno));
462            if (blockSize < (size_t)seek_result) {
463                if (SeekFile(file, getLogFile(),
464                    seek_result - unaligned_bytes, false) < 0) {
465                    LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
466                            getLogFile().c_str(), strerror(errno));
467                    return false;
468                }
469            } else {
470                throw ShortReadException();
471            }
472        }
473        logSize = static_cast<size_t>(seek_result);
474    }
475    return true;
476}
477
478static uint8_t parseConfigString(const std::string &s) {
479    uint8_t rv(0);
480    if (s == "off") {
481        rv = 0;
482    } else if (s == "commit1") {
483        rv = 1;
484    } else if (s == "commit2") {
485        rv = 2;
486    } else if (s == "full") {
487        rv = 3;
488    } else {
489        rv = 0xff;
490    }
491    return rv;
492}
493
494bool MutationLog::setSyncConfig(const std::string &s) {
495    uint8_t v(parseConfigString(s));
496    if (v != 0xff) {
497        syncConfig = (syncConfig & ~SYNC_FULL) | v;
498    }
499    return v != 0xff;
500}
501
502bool MutationLog::setFlushConfig(const std::string &s) {
503    uint8_t v(parseConfigString(s));
504    if (v != 0xff) {
505        syncConfig = (syncConfig & ~FLUSH_FULL) | (v << 2);
506    }
507    return v != 0xff;
508}
509
510bool MutationLog::exists() const {
511    return access(logPath.c_str(), F_OK) == 0;
512}
513
514void MutationLog::open(bool _readOnly) {
515    if (!isEnabled()) {
516        return;
517    }
518    readOnly = _readOnly;
519    std::string error;
520    if (readOnly) {
521        if (!exists()) {
522            throw FileNotFoundException(logPath);
523        }
524        file = OpenFile(logPath, error, true);
525    } else {
526        file = OpenFile(logPath, error, false);
527    }
528
529    if (file == INVALID_FILE_VALUE) {
530        std::stringstream ss;
531        ss << "Unable to open log file: " << error; // strerror(errno);
532        throw ReadException(ss.str());
533    }
534
535    int64_t size;
536    try {
537         size = getFileSize(file);
538    } catch (std::system_error& e) {
539        throw ReadException(e.what());
540    }
541    if (size && size < static_cast<int64_t>(MIN_LOG_HEADER_SIZE)) {
542        try {
543            LOG(EXTENSION_LOG_WARNING, "WARNING: Corrupted access log '%s'",
544                    getLogFile().c_str());
545            reset();
546            return;
547        } catch (ShortReadException &) {
548            close();
549            disabled = true;
550            throw ShortReadException();
551        }
552    }
553    if (size == 0) {
554        if (!writeInitialBlock()) {
555            close();
556            disabled = true;
557            return;
558        }
559    } else {
560        try {
561            readInitialBlock();
562        } catch (ShortReadException &) {
563            close();
564            disabled = true;
565            throw ShortReadException();
566        }
567
568        if (!readOnly) {
569            headerBlock.setRdwr(1);
570            updateInitialBlock();
571        }
572    }
573
574    if (!prepareWrites()) {
575        close();
576        disabled = true;
577        return;
578    }
579}
580
581void MutationLog::close() {
582    if (!isEnabled() || !isOpen()) {
583        return;
584    }
585
586    if (!readOnly) {
587        flush();
588        sync();
589        headerBlock.setRdwr(0);
590        updateInitialBlock();
591    }
592
593    doClose(file);
594    file = INVALID_FILE_VALUE;
595}
596
597bool MutationLog::reset() {
598    if (!isEnabled()) {
599        return false;
600    }
601    close();
602
603    if (remove(getLogFile().c_str()) == -1) {
604        LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
605            getLogFile().c_str(), strerror(errno));
606        return false;
607    }
608
609    open();
610    LOG(EXTENSION_LOG_INFO, "Reset a mutation log '%s' successfully.",
611        getLogFile().c_str());
612    return true;
613}
614
615bool MutationLog::replaceWith(MutationLog &mlog) {
616    if (!mlog.isEnabled()) {
617        throw std::invalid_argument("MutationLog::replaceWith: "
618                                    "mlog is not enabled");
619    }
620    if (!isEnabled()) {
621        throw std::logic_error("MutationLog::replaceWith: Not valid on "
622                               "a disabled log");
623    }
624
625    if (!mlog.flush()) {
626        return false;
627    }
628    mlog.close();
629
630    if (!flush()) {
631        return false;
632    }
633    close();
634
635    for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
636        itemsLogged[i].store(mlog.itemsLogged[i]);
637    }
638
639    if (rename(mlog.getLogFile().c_str(), getLogFile().c_str()) != 0) {
640        open();
641        std::stringstream ss;
642        ss <<
643        "Unable to rename a mutation log \"" << mlog.getLogFile() << "\" "
644           << "to \"" << getLogFile() << "\": " << strerror(errno);
645        LOG(EXTENSION_LOG_WARNING, "%s!!! Reopened the old log file",
646            ss.str().c_str());
647        return false;
648    }
649
650    open();
651    LOG(EXTENSION_LOG_INFO,
652        "Renamed a mutation log \"%s\" to \"%s\" and reopened it",
653        mlog.getLogFile().c_str(), getLogFile().c_str());
654    return true;
655}
656
657bool MutationLog::flush() {
658    if (isEnabled() && blockPos > HEADER_RESERVED) {
659        if (!isOpen()) {
660            throw std::logic_error("MutationLog::flush: "
661                                   "Not valid on a closed log");
662        }
663        needWriteAccess();
664        BlockTimer timer(&flushTimeHisto);
665
666        if (blockPos < blockSize) {
667            size_t padding(blockSize - blockPos);
668            memset(blockBuffer.get() + blockPos, 0x00, padding);
669            paddingHisto.add(padding);
670        }
671
672        entries = htons(entries);
673        memcpy(blockBuffer.get() + 2, &entries, sizeof(entries));
674
675        uint32_t crc32(crc32buf(blockBuffer.get() + 2, blockSize - 2));
676        uint16_t crc16(htons(crc32 & 0xffff));
677        memcpy(blockBuffer.get(), &crc16, sizeof(crc16));
678
679        if (writeFully(file, blockBuffer.get(), blockSize)) {
680            logSize.fetch_add(blockSize);
681            blockPos = HEADER_RESERVED;
682            entries = 0;
683        } else {
684            /* write to the mutation log failed. Disable the log */
685            disabled = true;
686            LOG(EXTENSION_LOG_WARNING,
687                "Disabling access log due to write failures");
688            return false;
689        }
690    }
691
692    return true;
693}
694
695void MutationLog::writeEntry(MutationLogEntry *mle) {
696    if (mle->len() >= blockSize) {
697        throw std::invalid_argument("MutationLog::writeEntry: argument mle "
698                "has length (which is " + std::to_string(mle->len()) +
699                ") greater than or equal to blockSize (which is" +
700                std::to_string(blockSize) + ")");
701    }
702    if (!isEnabled()) {
703        throw std::logic_error("MutationLog::writeEntry: Not valid on "
704                "a disabled log");
705    }
706    if (!isOpen()) {
707        throw std::logic_error("MutationLog::writeEntry: Not valid on "
708                "a closed log");
709    }
710    needWriteAccess();
711
712    size_t len(mle->len());
713    if (blockPos + len > blockSize) {
714        flush();
715    }
716
717    memcpy(blockBuffer.get() + blockPos, mle, len);
718    blockPos += len;
719    ++entries;
720
721    ++itemsLogged[mle->type()];
722
723    delete mle;
724}
725
726static const char* logType(uint8_t t) {
727    switch(t) {
728    case ML_NEW:
729        return "new";
730        break;
731    case ML_COMMIT1:
732        return "commit1";
733        break;
734    case ML_COMMIT2:
735        return "commit2";
736        break;
737    }
738    return "UNKNOWN";
739}
740
741// ----------------------------------------------------------------------
742// Mutation log iterator
743// ----------------------------------------------------------------------
744
745MutationLog::iterator::iterator(const MutationLog* l, bool e)
746  : log(l),
747    entryBuf(),
748    buf(),
749    p(nullptr),
750    offset(l->header().blockSize() * l->header().blockCount()),
751    items(0),
752    isEnd(e)
753{
754}
755
756MutationLog::iterator::iterator(const MutationLog::iterator& mit)
757  : log(mit.log),
758    entryBuf(),
759    buf(),
760    p(nullptr),
761    offset(mit.offset),
762    items(mit.items),
763    isEnd(mit.isEnd)
764{
765    if (mit.buf != NULL) {
766        buf.reset(new uint8_t[log->header().blockSize()]());
767        memcpy(buf.get(), mit.buf.get(), log->header().blockSize());
768        p = buf.get() + (mit.p - mit.buf.get());
769    }
770
771    if (mit.entryBuf != NULL) {
772        entryBuf.reset(new uint8_t[LOG_ENTRY_BUF_SIZE]());
773        memcpy(entryBuf.get(), mit.entryBuf.get(), LOG_ENTRY_BUF_SIZE);
774    }
775}
776
777MutationLog::iterator& MutationLog::iterator::operator=(const MutationLog::iterator& other)
778{
779    log = other.log;
780    if (other.buf != nullptr) {
781        buf.reset(new uint8_t[log->header().blockSize()]());
782        memcpy(buf.get(), other.buf.get(), log->header().blockSize());
783        p = buf.get() + (other.p - other.buf.get());
784    } else {
785        buf = nullptr;
786        p = nullptr;
787    }
788
789    if (other.entryBuf != nullptr) {
790        entryBuf.reset(new uint8_t[LOG_ENTRY_BUF_SIZE]());
791        memcpy(entryBuf.get(), other.entryBuf.get(), LOG_ENTRY_BUF_SIZE);
792    } else {
793        entryBuf = nullptr;
794    }
795
796    offset = other.offset;
797    items = other.items;
798    isEnd = other.isEnd;
799
800    return *this;
801}
802
803MutationLog::iterator::~iterator() {
804}
805
806void MutationLog::iterator::prepItem() {
807    MutationLogEntry *e = MutationLogEntry::newEntry(p,
808                                                     bufferBytesRemaining());
809    if (entryBuf == NULL) {
810        entryBuf.reset(new uint8_t[LOG_ENTRY_BUF_SIZE]());
811    }
812    memcpy(entryBuf.get(), p, e->len());
813}
814
815MutationLog::iterator& MutationLog::iterator::operator++() {
816    if (--items == 0) {
817        nextBlock();
818    } else {
819        size_t l(operator*()->len());
820        p += l;
821
822        prepItem();
823    }
824    return *this;
825}
826
827bool MutationLog::iterator::operator==(const MutationLog::iterator& rhs) const {
828    return log->fd() == rhs.log->fd()
829        && (
830            (isEnd == rhs.isEnd)
831            || (offset == rhs.offset
832                && items == rhs.items));
833}
834
835bool MutationLog::iterator::operator!=(const MutationLog::iterator& rhs) const {
836    return ! operator==(rhs);
837}
838
839const MutationLogEntry* MutationLog::iterator::operator*() {
840    if (entryBuf == nullptr) {
841        throw std::logic_error("MutationLog::iterator::operator*(): "
842                "entryBuf is NULL");
843    }
844    return MutationLogEntry::newEntry(entryBuf.get(), LOG_ENTRY_BUF_SIZE);
845}
846
847size_t MutationLog::iterator::bufferBytesRemaining() {
848    return log->header().blockSize() - (p - buf.get());
849}
850
851void MutationLog::iterator::nextBlock() {
852    if (log->isEnabled() && !log->isOpen()) {
853        throw std::logic_error("MutationLog::iterator::nextBlock: "
854                "log is enabled and not open");
855    }
856
857    if (buf == NULL) {
858        buf.reset(new uint8_t[log->header().blockSize()]());
859    }
860    p = buf.get();
861
862    ssize_t bytesread = pread(log->fd(), buf.get(), log->header().blockSize(),
863                              offset);
864    if (bytesread < 1) {
865        isEnd = true;
866        return;
867    }
868    if (bytesread != (ssize_t)(log->header().blockSize())) {
869        LOG(EXTENSION_LOG_WARNING, "FATAL: too few bytes read in access log"
870                "'%s': %s", log->getLogFile().c_str(), strerror(errno));
871        throw ShortReadException();
872    }
873    offset += bytesread;
874
875    uint32_t crc32(crc32buf(buf.get() + 2, log->header().blockSize() - 2));
876    uint16_t computed_crc16(crc32 & 0xffff);
877    uint16_t retrieved_crc16;
878    memcpy(&retrieved_crc16, buf.get(), sizeof(retrieved_crc16));
879    retrieved_crc16 = ntohs(retrieved_crc16);
880    if (computed_crc16 != retrieved_crc16) {
881        throw CRCReadException();
882    }
883
884    memcpy(&items, buf.get() + 2, 2);
885    items = ntohs(items);
886
887    p = p + 4;
888
889    prepItem();
890}
891
892void MutationLog::resetCounts(size_t *items) {
893    for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
894        itemsLogged[i] = items[i];
895    }
896}
897
898// ----------------------------------------------------------------------
899// Reading entries
900// ----------------------------------------------------------------------
901
902bool MutationLogHarvester::load() {
903    bool clean(false);
904    std::set<uint16_t> shouldClear;
905    for (const MutationLogEntry* le : mlog) {
906        ++itemsSeen[le->type()];
907        clean = false;
908
909        switch (le->type()) {
910        case ML_NEW:
911            if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
912                loading[le->vbucket()].emplace(le->key());
913            }
914            break;
915        case ML_COMMIT2:
916            clean = true;
917
918            for (const uint16_t vb : vbid_set) {
919                for (auto& item : loading[vb]) {
920                    committed[vb].emplace(item);
921                }
922            }
923            loading.clear();
924            break;
925        case ML_COMMIT1:
926            // nothing in particular
927            break;
928        default:
929            throw std::logic_error("MutationLogHarvester::load: Invalid log "
930                    "entry type:" + std::to_string(le->type()));
931        }
932    }
933    return clean;
934}
935
936MutationLog::iterator MutationLogHarvester::loadBatch(
937        const MutationLog::iterator& start, size_t limit) {
938    if (limit == 0) {
939        limit = std::numeric_limits<size_t>::max();
940    }
941    auto it = start;
942    size_t count = 0;
943    committed.clear();
944    for (; it != mlog.end() && count < limit; ++it) {
945        const auto* le = *it;
946        ++itemsSeen[le->type()];
947
948        switch (le->type()) {
949        case ML_NEW:
950            if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
951                committed[le->vbucket()].emplace(le->key());
952                count++;
953            }
954            break;
955
956        case ML_COMMIT2:
957            // We ignore COMMIT2 for Access log, was only relevent to the
958            // 'proper' mutation log.
959            break;
960
961        default:
962            // Just ignore anything else also.
963            break;
964        }
965    }
966    return it;
967}
968
969void MutationLogHarvester::apply(void *arg, mlCallback mlc) {
970    for (const uint16_t vb : vbid_set) {
971        for (const auto& key : committed[vb]) {
972            if (!mlc(arg, vb, key)) { // Stop loading from an access log
973                return;
974            }
975        }
976    }
977}
978
979void MutationLogHarvester::apply(void *arg, mlCallbackWithQueue mlc) {
980    if (engine == nullptr) {
981        throw std::logic_error("MutationLogHarvester::apply: Cannot apply "
982                "when engine is NULL");
983    }
984    for (const uint16_t vb : vbid_set) {
985        RCPtr<VBucket> vbucket = engine->getEpStore()->getVBucket(vb);
986        if (!vbucket) {
987            continue;
988        }
989
990        // Remove any items which are no longer valid in the VBucket.
991        for (auto it = committed[vb].begin(); it != committed[vb].end(); ) {
992            if ((vbucket->ht.find(*it, false) == nullptr)) {
993                it = committed[vb].erase(it);
994            }
995            else {
996                ++it;
997            }
998        }
999
1000        if (!mlc(vb, committed[vb], arg)) {
1001            return;
1002        }
1003        committed[vb].clear();
1004    }
1005}
1006
1007size_t MutationLogHarvester::total() {
1008    size_t rv(0);
1009    for (int i = 0; i < MUTATION_LOG_TYPES; ++i) {
1010        rv += itemsSeen[i];
1011    }
1012    return rv;
1013}
1014
1015// ----------------------------------------------------------------------
1016// Output of entries
1017// ----------------------------------------------------------------------
1018
1019std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle) {
1020    out << "{MutationLogEntry rowid=" << mle.rowid()
1021        << ", vbucket=" << mle.vbucket()
1022        << ", magic=0x" << std::hex << static_cast<uint16_t>(mle.magic)
1023        << std::dec
1024        << ", type=" << logType(mle.type())
1025        << ", key=``" << mle.key() << "''";
1026    return out;
1027}
1028
1029std::ostream& operator<<(std::ostream& out, const MutationLog& mlog) {
1030    out << "MutationLog{logPath:" << mlog.logPath << ", "
1031        << "blockSize:" << mlog.blockSize << ", "
1032        << "blockPos:" << mlog.blockPos << ", "
1033        << "file:" << mlog.file << ", "
1034        << "disabled:" << mlog.disabled << ", "
1035        << "entries:" << mlog.entries << ", "
1036        << "entryBuffer:" << reinterpret_cast<void*>(mlog.entryBuffer.get())
1037        << ", "
1038        << "blockBuffer:" << reinterpret_cast<void*>(mlog.blockBuffer.get())
1039        << ", "
1040        << "syncConfig:" << int(mlog.syncConfig) << ", "
1041        << "readOnly:" << mlog.readOnly << "}";
1042    return out;
1043}
1044