1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2011 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <sys/stat.h>
21 
22 #include <algorithm>
23 #include <string>
24 #include <utility>
25 
26 extern "C" {
27 #include "crc32.h"
28 }
29 #include "ep_engine.h"
30 #include "mutation_log.h"
31 
32 const char *mutation_log_type_names[] = {
33     "new", "del", "del_all", "commit1", "commit2", NULL
34 };
35 
36 
37 #ifdef WIN32
pread(file_handle_t fd, void *buf, size_t nbyte, uint64_t offset)38 ssize_t pread(file_handle_t fd, void *buf, size_t nbyte, uint64_t offset)
39 {
40     DWORD bytesread;
41     OVERLAPPED winoffs;
42     memset(&winoffs, 0, sizeof(winoffs));
43     winoffs.Offset = offset & 0xFFFFFFFF;
44     winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
45     if (!ReadFile(fd, buf, nbyte, &bytesread, &winoffs)) {
46         /* luckily we don't check errno so we don't need to care about that */
47         return -1;
48     }
49 
50     return bytesread;
51 }
52 
pwrite(file_handle_t fd, const void *buf, size_t nbyte, uint64_t offset)53 ssize_t pwrite(file_handle_t fd, const void *buf, size_t nbyte,
54                uint64_t offset)
55 {
56     DWORD byteswritten;
57     OVERLAPPED winoffs;
58     memset(&winoffs, 0, sizeof(winoffs));
59     winoffs.Offset = offset & 0xFFFFFFFF;
60     winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
61     if (!WriteFile(fd, buf, nbyte, &byteswritten, &winoffs)) {
62         /* luckily we don't check errno so we don't need to care about that */
63         return -1;
64     }
65 
66     return byteswritten;
67 }
68 
doWrite(file_handle_t fd, const uint8_t *buf, size_t nbytes)69 static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
70                               size_t nbytes) {
71     DWORD byteswritten;
72     if (!WriteFile(fd, buf, nbytes, &byteswritten, NULL)) {
73         /* luckily we don't check errno so we don't need to care about that */
74         return -1;
75     }
76 
77     cb_assert(GetLastError() != ERROR_IO_PENDING);
78     return byteswritten;
79 }
80 
doClose(file_handle_t fd)81 static inline int doClose(file_handle_t fd) {
82     if (CloseHandle(fd)) {
83         return 0;
84     } else {
85         return -1;
86     }
87 }
88 
doFsync(file_handle_t fd)89 static inline int doFsync(file_handle_t fd) {
90     if (FlushFileBuffers(fd)) {
91         return 0;
92     } else {
93         return -1;
94     }
95 }
96 
getErrorString(void)97 static inline std::string getErrorString(void) {
98     std::string ret;
99     char* win_msg = NULL;
100     DWORD err = GetLastError();
101     FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
102         FORMAT_MESSAGE_FROM_SYSTEM |
103         FORMAT_MESSAGE_IGNORE_INSERTS,
104         NULL, err,
105         MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
106         (LPTSTR)&win_msg,
107         0, NULL);
108     ret.assign(win_msg);
109     LocalFree(win_msg);
110     return ret;
111 }
112 
SeekFile(file_handle_t fd, const std::string &fname, uint64_t offset, bool end)113 static int64_t SeekFile(file_handle_t fd, const std::string &fname,
114                         uint64_t offset, bool end)
115 {
116     LARGE_INTEGER li;
117     li.QuadPart = offset;
118 
119     if (end) {
120         li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_END);
121     } else {
122         li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_BEGIN);
123     }
124 
125     if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) {
126         std::stringstream ss;
127         ss << "FATAL: SetFilePointer failed " << fname << ": " <<
128               getErrorString();
129         LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
130         li.QuadPart = -1;
131     }
132 
133     return li.QuadPart;
134 }
135 
OpenFile(const std::string &fname, std::string &error, bool rdonly)136 file_handle_t OpenFile(const std::string &fname, std::string &error,
137                        bool rdonly) {
138     file_handle_t fd;
139     if (rdonly) {
140         fd = CreateFile(const_cast<char*>(fname.c_str()),
141             GENERIC_READ,
142             FILE_SHARE_READ | FILE_SHARE_WRITE,
143             NULL,
144             OPEN_EXISTING,
145             FILE_ATTRIBUTE_NORMAL,
146             NULL);
147     } else {
148         fd = CreateFile(const_cast<char*>(fname.c_str()),
149             GENERIC_READ | GENERIC_WRITE,
150             FILE_SHARE_READ | FILE_SHARE_WRITE,
151             NULL,
152             OPEN_ALWAYS,
153             FILE_ATTRIBUTE_NORMAL,
154             NULL);
155     }
156 
157     if (fd == INVALID_FILE_VALUE) {
158         error.assign(getErrorString());
159     }
160 
161     return fd;
162 }
163 
getFileSize(file_handle_t fd)164 int64_t getFileSize(file_handle_t fd) {
165     LARGE_INTEGER li;
166     if (GetFileSizeEx(fd, &li)) {
167         return li.QuadPart;
168     }
169     abort();
170 }
171 
172 #else
173 
doWrite(file_handle_t fd, const uint8_t *buf, size_t nbytes)174 static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
175                               size_t nbytes) {
176     ssize_t ret;
177     while ((ret = write(fd, buf, nbytes)) == -1 && (errno == EINTR)) {
178         /* Retry */
179     }
180     return ret;
181 }
182 
doClose(file_handle_t fd)183 static inline int doClose(file_handle_t fd) {
184     int ret;
185     while ((ret = close(fd)) == -1 && (errno == EINTR)) {
186         /* Retry */
187     }
188     return ret;
189 }
190 
doFsync(file_handle_t fd)191 static inline int doFsync(file_handle_t fd) {
192     int ret;
193     while ((ret = fsync(fd)) == -1 && (errno == EINTR)) {
194         /* Retry */
195     }
196     return ret;
197 }
198 
SeekFile(file_handle_t fd, const std::string &fname, uint64_t offset, bool end)199 static int64_t SeekFile(file_handle_t fd, const std::string &fname,
200                         uint64_t offset, bool end)
201 {
202     int64_t ret;
203     if (end) {
204         ret = lseek(fd, offset, SEEK_END);
205     } else {
206         ret = lseek(fd, offset, SEEK_SET);
207     }
208 
209     if (ret < 0) {
210         LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
211             fname.c_str(),
212             strerror(errno));
213     }
214     return ret;
215 }
216 
OpenFile(const std::string &fname, std::string &error, bool rdonly)217 file_handle_t OpenFile(const std::string &fname, std::string &error,
218                        bool rdonly) {
219     file_handle_t fd;
220     if (rdonly) {
221         fd = ::open(const_cast<char*>(fname.c_str()), O_RDONLY);
222     } else {
223         fd = ::open(const_cast<char*>(fname.c_str()), O_RDWR | O_CREAT, 0666);
224     }
225 
226     if (fd < 0) {
227         error.assign(strerror(errno));
228     }
229 
230     return fd;
231 }
232 
getFileSize(file_handle_t fd)233 int64_t getFileSize(file_handle_t fd) {
234     struct stat st;
235     int stat_result = fstat(fd, &st);
236     cb_assert(stat_result == 0);
237     return st.st_size;
238 }
239 #endif
240 
241 
writeFully(file_handle_t fd, const uint8_t *buf, size_t nbytes)242 static void writeFully(file_handle_t fd, const uint8_t *buf, size_t nbytes) {
243     while (nbytes > 0) {
244         ssize_t written = doWrite(fd, buf, nbytes);
245         cb_assert(written >= 0);
246 
247         nbytes -= written;
248         buf += written;
249     }
250 }
251 
rowid() const252 uint64_t MutationLogEntry::rowid() const {
253     return ntohll(_rowid);
254 }
255 
MutationLog(const std::string &path, const size_t bs)256 MutationLog::MutationLog(const std::string &path,
257                          const size_t bs)
258     : paddingHisto(GrowingWidthGenerator<uint32_t>(0, 8, 1.5), 32),
259     logPath(path),
260     blockSize(bs),
261     blockPos(HEADER_RESERVED),
262     file(INVALID_FILE_VALUE),
263     disabled(false),
264     entries(0),
265     entryBuffer(static_cast<uint8_t*>(calloc(MutationLogEntry::len(256), 1))),
266     blockBuffer(static_cast<uint8_t*>(calloc(bs, 1))),
267     syncConfig(DEFAULT_SYNC_CONF),
268     readOnly(false)
269 {
270     for (int ii = 0; ii < MUTATION_LOG_TYPES; ++ii) {
271         itemsLogged[ii].store(0);
272     }
273     logSize.store(0);
274 
275     cb_assert(entryBuffer);
276     cb_assert(blockBuffer);
277     if (logPath == "") {
278         disabled = true;
279     }
280 }
281 
~MutationLog()282 MutationLog::~MutationLog() {
283     flush();
284     close();
285     free(entryBuffer);
286     free(blockBuffer);
287 }
288 
disable()289 void MutationLog::disable() {
290     if (file >= 0) {
291         close();
292         disabled = true;
293     }
294 }
295 
newItem(uint16_t vbucket, const std::string &key, uint64_t rowid)296 void MutationLog::newItem(uint16_t vbucket, const std::string &key,
297                           uint64_t rowid) {
298     if (isEnabled()) {
299         MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
300                                                            rowid, ML_NEW,
301                                                            vbucket, key);
302         writeEntry(mle);
303     }
304 }
305 
delItem(uint16_t vbucket, const std::string &key)306 void MutationLog::delItem(uint16_t vbucket, const std::string &key) {
307     if (isEnabled()) {
308         MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
309                                                            0, ML_DEL, vbucket,
310                                                            key);
311         writeEntry(mle);
312     }
313 }
314 
deleteAll(uint16_t vbucket)315 void MutationLog::deleteAll(uint16_t vbucket) {
316     if (isEnabled()) {
317         MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
318                                                            0, ML_DEL_ALL,
319                                                            vbucket, "");
320         writeEntry(mle);
321     }
322 }
323 
sync()324 void MutationLog::sync() {
325     cb_assert(isOpen());
326     BlockTimer timer(&syncTimeHisto);
327     int fsyncResult = doFsync(file);
328     cb_assert(fsyncResult != -1);
329 }
330 
commit1()331 void MutationLog::commit1() {
332     if (isEnabled()) {
333         MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
334                                                            0, ML_COMMIT1, 0,
335                                                            "");
336         writeEntry(mle);
337         if ((getSyncConfig() & FLUSH_COMMIT_1) != 0) {
338             flush();
339         }
340         if ((getSyncConfig() & SYNC_COMMIT_1) != 0) {
341             sync();
342         }
343     }
344 }
345 
commit2()346 void MutationLog::commit2() {
347     if (isEnabled()) {
348         MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
349                                                            0, ML_COMMIT2, 0,
350                                                            "");
351         writeEntry(mle);
352         if ((getSyncConfig() & FLUSH_COMMIT_2) != 0) {
353             flush();
354         }
355         if ((getSyncConfig() & SYNC_COMMIT_2) != 0) {
356             sync();
357         }
358     }
359 }
360 
writeInitialBlock()361 bool MutationLog::writeInitialBlock() {
362     cb_assert(!readOnly);
363     cb_assert(isEnabled());
364     cb_assert(isOpen());
365     headerBlock.set(blockSize);
366 
367     writeFully(file, (uint8_t*)&headerBlock, sizeof(headerBlock));
368 
369     int64_t seek_result = SeekFile(file, getLogFile(),
370                             std::max(
371                             static_cast<uint32_t>(MIN_LOG_HEADER_SIZE),
372                             headerBlock.blockSize() * headerBlock.blockCount())
373                              - 1, false);
374     if (seek_result < 0) {
375         LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
376             getLogFile().c_str(), strerror(errno));
377         return false;
378     }
379     uint8_t zero(0);
380     writeFully(file, &zero, sizeof(zero));
381     return true;
382 }
383 
readInitialBlock()384 void MutationLog::readInitialBlock() {
385     cb_assert(isOpen());
386     uint8_t buf[MIN_LOG_HEADER_SIZE];
387     ssize_t bytesread = pread(file, buf, sizeof(buf), 0);
388 
389     if (bytesread != sizeof(buf)) {
390         LOG(EXTENSION_LOG_WARNING, "FATAL: initial block read failed"
391                 "'%s': %s", getLogFile().c_str(), strerror(errno));
392         throw ShortReadException();
393     }
394 
395     headerBlock.set(buf, sizeof(buf));
396 
397     // These are reserved for future use.
398     if (headerBlock.version() != LOG_VERSION ||
399             headerBlock.blockCount() != 1) {
400         std::stringstream ss;
401         ss << "HeaderBlock version/blockCount mismatch";
402         throw ReadException(ss.str());
403     }
404 
405     blockSize = headerBlock.blockSize();
406 }
407 
updateInitialBlock()408 void MutationLog::updateInitialBlock() {
409     cb_assert(!readOnly);
410     cb_assert(isOpen());
411     needWriteAccess();
412 
413     uint8_t buf[MIN_LOG_HEADER_SIZE];
414     memset(buf, 0, sizeof(buf));
415     memcpy(buf, (uint8_t*)&headerBlock, sizeof(headerBlock));
416 
417     ssize_t byteswritten = pwrite(file, buf, sizeof(buf), 0);
418 
419     // @todo we need a write exception
420     if (byteswritten != sizeof(buf)) {
421         throw WriteException("Failed to update header block");
422     }
423 }
424 
prepareWrites()425 bool MutationLog::prepareWrites() {
426     if (isEnabled()) {
427         cb_assert(isOpen());
428         int64_t seek_result = SeekFile(file, getLogFile(), 0, true);
429         if (seek_result < 0) {
430             return false;
431         }
432         int64_t unaligned_bytes = seek_result % blockSize;
433         if (unaligned_bytes != 0) {
434             LOG(EXTENSION_LOG_WARNING,
435                     "WARNING: filesize %d not block aligned", seek_result,
436                     "'%s': %s", getLogFile().c_str(), strerror(errno));
437             if (blockSize < (size_t)seek_result) {
438                 if (SeekFile(file, getLogFile(),
439                     seek_result - unaligned_bytes, false) < 0) {
440                     LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
441                             getLogFile().c_str(), strerror(errno));
442                     return false;
443                 }
444             } else {
445                 throw ShortReadException();
446             }
447         }
448         logSize = static_cast<size_t>(seek_result);
449     }
450     return true;
451 }
452 
parseConfigString(const std::string &s)453 static uint8_t parseConfigString(const std::string &s) {
454     uint8_t rv(0);
455     if (s == "off") {
456         rv = 0;
457     } else if (s == "commit1") {
458         rv = 1;
459     } else if (s == "commit2") {
460         rv = 2;
461     } else if (s == "full") {
462         rv = 3;
463     } else {
464         rv = 0xff;
465     }
466     return rv;
467 }
468 
setSyncConfig(const std::string &s)469 bool MutationLog::setSyncConfig(const std::string &s) {
470     uint8_t v(parseConfigString(s));
471     if (v != 0xff) {
472         syncConfig = (syncConfig & ~SYNC_FULL) | v;
473     }
474     return v != 0xff;
475 }
476 
setFlushConfig(const std::string &s)477 bool MutationLog::setFlushConfig(const std::string &s) {
478     uint8_t v(parseConfigString(s));
479     if (v != 0xff) {
480         syncConfig = (syncConfig & ~FLUSH_FULL) | (v << 2);
481     }
482     return v != 0xff;
483 }
484 
exists() const485 bool MutationLog::exists() const {
486     return access(logPath.c_str(), F_OK) == 0;
487 }
488 
open(bool _readOnly)489 void MutationLog::open(bool _readOnly) {
490     if (!isEnabled()) {
491         return;
492     }
493     readOnly = _readOnly;
494     std::string error;
495     if (readOnly) {
496         if (!exists()) {
497             throw FileNotFoundException(logPath);
498         }
499         file = OpenFile(logPath, error, true);
500     } else {
501         file = OpenFile(logPath, error, false);
502     }
503 
504     if (file == INVALID_FILE_VALUE) {
505         std::stringstream ss;
506         ss << "Unable to open log file: " << error; // strerror(errno);
507         throw ReadException(ss.str());
508     }
509 
510     int64_t size = getFileSize(file);
511     if (size && size < static_cast<int64_t>(MIN_LOG_HEADER_SIZE)) {
512         try {
513             LOG(EXTENSION_LOG_WARNING, "WARNING: Corrupted access log '%s'",
514                     getLogFile().c_str());
515             reset();
516             return;
517         } catch (ShortReadException &) {
518             close();
519             disabled = true;
520             throw ShortReadException();
521         }
522     }
523     if (size == 0) {
524         if (!writeInitialBlock()) {
525             close();
526             disabled = true;
527             return;
528         }
529     } else {
530         try {
531             readInitialBlock();
532         } catch (ShortReadException &) {
533             close();
534             disabled = true;
535             throw ShortReadException();
536         }
537 
538         if (!readOnly) {
539             headerBlock.setRdwr(1);
540             updateInitialBlock();
541         }
542     }
543 
544     if (!prepareWrites()) {
545         close();
546         disabled = true;
547         return;
548     }
549 
550     cb_assert(isOpen());
551 }
552 
close()553 void MutationLog::close() {
554     if (!isEnabled() || !isOpen()) {
555         return;
556     }
557 
558     if (!readOnly) {
559         flush();
560         sync();
561         headerBlock.setRdwr(0);
562         updateInitialBlock();
563     }
564 
565     int close_result = doClose(file);
566     cb_assert(close_result != -1);
567     file = INVALID_FILE_VALUE;
568 }
569 
reset()570 bool MutationLog::reset() {
571     if (!isEnabled()) {
572         return false;
573     }
574     close();
575 
576     if (remove(getLogFile().c_str()) == -1) {
577         LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
578             getLogFile().c_str(), strerror(errno));
579         return false;
580     }
581 
582     open();
583     LOG(EXTENSION_LOG_INFO, "Reset a mutation log '%s' successfully.",
584         getLogFile().c_str());
585     return true;
586 }
587 
replaceWith(MutationLog &mlog)588 bool MutationLog::replaceWith(MutationLog &mlog) {
589     cb_assert(mlog.isEnabled());
590     cb_assert(isEnabled());
591 
592     mlog.flush();
593     mlog.close();
594     flush();
595     close();
596 
597     for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
598         itemsLogged[i].store(mlog.itemsLogged[i]);
599     }
600 
601     if (rename(mlog.getLogFile().c_str(), getLogFile().c_str()) != 0) {
602         open();
603         std::stringstream ss;
604         ss <<
605         "Unable to rename a mutation log \"" << mlog.getLogFile() << "\" "
606            << "to \"" << getLogFile() << "\": " << strerror(errno);
607         LOG(EXTENSION_LOG_WARNING, "%s!!! Reopened the old log file",
608             ss.str().c_str());
609         return false;
610     }
611 
612     open();
613     LOG(EXTENSION_LOG_INFO,
614         "Renamed a mutation log \"%s\" to \"%s\" and reopened it",
615         mlog.getLogFile().c_str(), getLogFile().c_str());
616     return true;
617 }
618 
flush()619 void MutationLog::flush() {
620     if (isEnabled() && blockPos > HEADER_RESERVED) {
621         cb_assert(isOpen());
622         needWriteAccess();
623         BlockTimer timer(&flushTimeHisto);
624 
625         if (blockPos < blockSize) {
626             size_t padding(blockSize - blockPos);
627             memset(blockBuffer + blockPos, 0x00, padding);
628             paddingHisto.add(padding);
629         }
630 
631         entries = htons(entries);
632         memcpy(blockBuffer + 2, &entries, sizeof(entries));
633 
634         uint32_t crc32(crc32buf(blockBuffer + 2, blockSize - 2));
635         uint16_t crc16(htons(crc32 & 0xffff));
636         memcpy(blockBuffer, &crc16, sizeof(crc16));
637 
638         writeFully(file, blockBuffer, blockSize);
639         logSize.fetch_add(blockSize);
640 
641         blockPos = HEADER_RESERVED;
642         entries = 0;
643     }
644 }
645 
writeEntry(MutationLogEntry *mle)646 void MutationLog::writeEntry(MutationLogEntry *mle) {
647     cb_assert(isEnabled());
648     cb_assert(isOpen());
649     needWriteAccess();
650 
651     size_t len(mle->len());
652     if (blockPos + len > blockSize) {
653         flush();
654     }
655     cb_assert(len < blockSize);
656 
657     memcpy(blockBuffer + blockPos, mle, len);
658     blockPos += len;
659     ++entries;
660 
661     ++itemsLogged[mle->type()];
662 
663     delete mle;
664 }
665 
logType(uint8_t t)666 static const char* logType(uint8_t t) {
667     switch(t) {
668     case ML_NEW:
669         return "new";
670         break;
671     case ML_DEL:
672         return "del";
673         break;
674     case ML_DEL_ALL:
675         return "delall";
676         break;
677     case ML_COMMIT1:
678         return "commit1";
679         break;
680     case ML_COMMIT2:
681         return "commit2";
682         break;
683     }
684     return "UNKNOWN";
685 }
686 
687 // ----------------------------------------------------------------------
688 // Mutation log iterator
689 // ----------------------------------------------------------------------
690 
iterator(const MutationLog *l, bool e)691 MutationLog::iterator::iterator(const MutationLog *l, bool e)
692   : log(l),
693     entryBuf(NULL),
694     buf(NULL),
695     p(buf),
696     offset(l->header().blockSize() * l->header().blockCount()),
697     items(0),
698     isEnd(e)
699 {
700     cb_assert(log);
701 }
702 
iterator(const MutationLog::iterator& mit)703 MutationLog::iterator::iterator(const MutationLog::iterator& mit)
704   : log(mit.log),
705     entryBuf(NULL),
706     buf(NULL),
707     p(NULL),
708     offset(mit.offset),
709     items(mit.items),
710     isEnd(mit.isEnd)
711 {
712     cb_assert(log);
713     if (mit.buf != NULL) {
714         buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize()));
715         cb_assert(buf);
716         memcpy(buf, mit.buf, log->header().blockSize());
717         p = buf + (mit.p - mit.buf);
718     }
719 
720     if (mit.entryBuf != NULL) {
721         entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE));
722         cb_assert(entryBuf);
723         memcpy(entryBuf, mit.entryBuf, LOG_ENTRY_BUF_SIZE);
724     }
725 }
726 
~iterator()727 MutationLog::iterator::~iterator() {
728     free(entryBuf);
729     free(buf);
730 }
731 
prepItem()732 void MutationLog::iterator::prepItem() {
733     MutationLogEntry *e = MutationLogEntry::newEntry(p,
734                                                      bufferBytesRemaining());
735     if (entryBuf == NULL) {
736         entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE));
737         cb_assert(entryBuf);
738     }
739     memcpy(entryBuf, p, e->len());
740 }
741 
operator ++()742 MutationLog::iterator& MutationLog::iterator::operator++() {
743     if (--items == 0) {
744         nextBlock();
745     } else {
746         size_t l(operator*()->len());
747         p += l;
748 
749         prepItem();
750     }
751     return *this;
752 }
753 
operator ++(int)754 MutationLog::iterator& MutationLog::iterator::operator++(int) {
755     abort();
756     return *this;
757 }
758 
operator ==(const MutationLog::iterator& rhs)759 bool MutationLog::iterator::operator==(const MutationLog::iterator& rhs) {
760     return log->fd() == rhs.log->fd()
761         && (
762             (isEnd == rhs.isEnd)
763             || (offset == rhs.offset
764                 && items == rhs.items));
765 }
766 
operator !=(const MutationLog::iterator& rhs)767 bool MutationLog::iterator::operator!=(const MutationLog::iterator& rhs) {
768     return ! operator==(rhs);
769 }
770 
operator *()771 const MutationLogEntry* MutationLog::iterator::operator*() {
772     cb_assert(entryBuf != NULL);
773     return MutationLogEntry::newEntry(entryBuf, LOG_ENTRY_BUF_SIZE);
774 }
775 
bufferBytesRemaining()776 size_t MutationLog::iterator::bufferBytesRemaining() {
777     return log->header().blockSize() - (p - buf);
778 }
779 
nextBlock()780 void MutationLog::iterator::nextBlock() {
781     cb_assert(!log->isEnabled() || log->isOpen());
782     if (buf == NULL) {
783         buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize()));
784         cb_assert(buf);
785     }
786     p = buf;
787 
788     ssize_t bytesread = pread(log->fd(), buf, log->header().blockSize(),
789                               offset);
790     if (bytesread < 1) {
791         isEnd = true;
792         return;
793     }
794     if (bytesread != (ssize_t)(log->header().blockSize())) {
795         LOG(EXTENSION_LOG_WARNING, "FATAL: too few bytes read in access log"
796                 "'%s': %s", log->getLogFile().c_str(), strerror(errno));
797         throw ShortReadException();
798     }
799     offset += bytesread;
800 
801     uint32_t crc32(crc32buf(buf + 2, log->header().blockSize() - 2));
802     uint16_t computed_crc16(crc32 & 0xffff);
803     uint16_t retrieved_crc16;
804     memcpy(&retrieved_crc16, buf, sizeof(retrieved_crc16));
805     retrieved_crc16 = ntohs(retrieved_crc16);
806     if (computed_crc16 != retrieved_crc16) {
807         throw CRCReadException();
808     }
809 
810     memcpy(&items, buf + 2, 2);
811     items = ntohs(items);
812 
813     p = p + 4;
814 
815     prepItem();
816 }
817 
resetCounts(size_t *items)818 void MutationLog::resetCounts(size_t *items) {
819     for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
820         itemsLogged[i] = items[i];
821     }
822 }
823 
824 // ----------------------------------------------------------------------
825 // Reading entries
826 // ----------------------------------------------------------------------
827 
load()828 bool MutationLogHarvester::load() {
829     bool clean(false);
830     std::set<uint16_t> shouldClear;
831     for (MutationLog::iterator it(mlog.begin()); it != mlog.end(); ++it) {
832         const MutationLogEntry *le = *it;
833         ++itemsSeen[le->type()];
834         clean = false;
835 
836         switch (le->type()) {
837         case ML_DEL:
838             // FALLTHROUGH
839         case ML_NEW:
840             if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
841                 loading[le->vbucket()][le->key()] =
842                                       std::make_pair(le->rowid(), le->type());
843             }
844             break;
845         case ML_COMMIT2: {
846             clean = true;
847             for (std::set<uint16_t>::iterator vit(shouldClear.begin());
848                  vit != shouldClear.end(); ++vit) {
849                 committed[*vit].clear();
850             }
851             shouldClear.clear();
852 
853             for (std::set<uint16_t>::const_iterator vit = vbid_set.begin();
854                  vit != vbid_set.end(); ++vit) {
855                 uint16_t vb(*vit);
856 
857                 unordered_map<std::string, mutation_log_event_t>::iterator
858                               copyit2;
859                 for (copyit2 = loading[vb].begin();
860                      copyit2 != loading[vb].end();
861                      ++copyit2) {
862 
863                     mutation_log_event_t t = copyit2->second;
864 
865                     switch (t.second) {
866                     case ML_NEW:
867                         committed[vb][copyit2->first] = t.first;
868                         break;
869                     case ML_DEL:
870                         committed[vb].erase(copyit2->first);
871                         break;
872                     default:
873                         abort();
874                     }
875                 }
876             }
877         }
878             loading.clear();
879             break;
880         case ML_COMMIT1:
881             // nothing in particular
882             break;
883         case ML_DEL_ALL:
884             if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
885                 loading[le->vbucket()].clear();
886                 shouldClear.insert(le->vbucket());
887             }
888             break;
889         default:
890             abort();
891         }
892     }
893     return clean;
894 }
895 
apply(void *arg, mlCallback mlc)896 void MutationLogHarvester::apply(void *arg, mlCallback mlc) {
897     for (std::set<uint16_t>::const_iterator it = vbid_set.begin();
898          it != vbid_set.end(); ++it) {
899         uint16_t vb(*it);
900 
901         for (unordered_map<std::string, uint64_t>::iterator it2 =
902                                                        committed[vb].begin();
903              it2 != committed[vb].end(); ++it2) {
904             const std::string key(it2->first);
905             uint64_t rowid(it2->second);
906             if (!mlc(arg, vb, key, rowid)) { // Stop loading from an access log
907                 return;
908             }
909         }
910     }
911 }
912 
apply(void *arg, mlCallbackWithQueue mlc)913 void MutationLogHarvester::apply(void *arg, mlCallbackWithQueue mlc) {
914     cb_assert(engine);
915     std::vector<std::pair<std::string, uint64_t> > fetches;
916     std::set<uint16_t>::const_iterator it = vbid_set.begin();
917     for (; it != vbid_set.end(); ++it) {
918         uint16_t vb(*it);
919         RCPtr<VBucket> vbucket = engine->getEpStore()->getVBucket(vb);
920         if (!vbucket) {
921             continue;
922         }
923         unordered_map<std::string, uint64_t>::iterator it2 =
924                                                          committed[vb].begin();
925         for (; it2 != committed[vb].end(); ++it2) {
926             // cannot use rowid from access log, so must read from hashtable
927             std::string key = it2->first;
928             StoredValue *v = NULL;
929             if ((v = vbucket->ht.find(key, false))) {
930                 fetches.push_back(std::make_pair(it2->first, v->getBySeqno()));
931             }
932         }
933         if (!mlc(vb, fetches, arg)) {
934             return;
935         }
936         fetches.clear();
937     }
938 }
939 
getUncommitted( std::vector<mutation_log_uncommitted_t> &uitems)940 void MutationLogHarvester::getUncommitted(
941                              std::vector<mutation_log_uncommitted_t> &uitems) {
942 
943     for (std::set<uint16_t>::const_iterator vit = vbid_set.begin();
944          vit != vbid_set.end(); ++vit) {
945         uint16_t vb(*vit);
946         mutation_log_uncommitted_t leftover;
947         leftover.vbucket = vb;
948 
949         unordered_map<std::string, mutation_log_event_t>::iterator copyit2;
950         for (copyit2 = loading[vb].begin();
951              copyit2 != loading[vb].end();
952              ++copyit2) {
953 
954             mutation_log_event_t t = copyit2->second;
955             leftover.key = copyit2->first;
956             leftover.rowid = t.first;
957             leftover.type = static_cast<mutation_log_type_t>(t.second);
958 
959             uitems.push_back(leftover);
960         }
961     }
962 }
963 
total()964 size_t MutationLogHarvester::total() {
965     size_t rv(0);
966     for (int i = 0; i < MUTATION_LOG_TYPES; ++i) {
967         rv += itemsSeen[i];
968     }
969     return rv;
970 }
971 
972 // ----------------------------------------------------------------------
973 // Output of entries
974 // ----------------------------------------------------------------------
975 
operator <<(std::ostream &out, const MutationLogEntry &mle)976 std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle) {
977     out << "{MutationLogEntry rowid=" << mle.rowid()
978         << ", vbucket=" << mle.vbucket()
979         << ", magic=0x" << std::hex << static_cast<uint16_t>(mle.magic)
980         << std::dec
981         << ", type=" << logType(mle.type())
982         << ", key=``" << mle.key() << "''";
983     return out;
984 }
985