1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2011 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include <sys/stat.h>
21
22 #include <algorithm>
23 #include <string>
24 #include <utility>
25
26 extern "C" {
27 #include "crc32.h"
28 }
29 #include "ep_engine.h"
30 #include "mutation_log.h"
31
32 const char *mutation_log_type_names[] = {
33 "new", "del", "del_all", "commit1", "commit2", NULL
34 };
35
36
37 #ifdef WIN32
pread(file_handle_t fd, void *buf, size_t nbyte, uint64_t offset)38 ssize_t pread(file_handle_t fd, void *buf, size_t nbyte, uint64_t offset)
39 {
40 DWORD bytesread;
41 OVERLAPPED winoffs;
42 memset(&winoffs, 0, sizeof(winoffs));
43 winoffs.Offset = offset & 0xFFFFFFFF;
44 winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
45 if (!ReadFile(fd, buf, nbyte, &bytesread, &winoffs)) {
46 /* luckily we don't check errno so we don't need to care about that */
47 return -1;
48 }
49
50 return bytesread;
51 }
52
pwrite(file_handle_t fd, const void *buf, size_t nbyte, uint64_t offset)53 ssize_t pwrite(file_handle_t fd, const void *buf, size_t nbyte,
54 uint64_t offset)
55 {
56 DWORD byteswritten;
57 OVERLAPPED winoffs;
58 memset(&winoffs, 0, sizeof(winoffs));
59 winoffs.Offset = offset & 0xFFFFFFFF;
60 winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF;
61 if (!WriteFile(fd, buf, nbyte, &byteswritten, &winoffs)) {
62 /* luckily we don't check errno so we don't need to care about that */
63 return -1;
64 }
65
66 return byteswritten;
67 }
68
doWrite(file_handle_t fd, const uint8_t *buf, size_t nbytes)69 static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
70 size_t nbytes) {
71 DWORD byteswritten;
72 if (!WriteFile(fd, buf, nbytes, &byteswritten, NULL)) {
73 /* luckily we don't check errno so we don't need to care about that */
74 return -1;
75 }
76
77 cb_assert(GetLastError() != ERROR_IO_PENDING);
78 return byteswritten;
79 }
80
doClose(file_handle_t fd)81 static inline int doClose(file_handle_t fd) {
82 if (CloseHandle(fd)) {
83 return 0;
84 } else {
85 return -1;
86 }
87 }
88
doFsync(file_handle_t fd)89 static inline int doFsync(file_handle_t fd) {
90 if (FlushFileBuffers(fd)) {
91 return 0;
92 } else {
93 return -1;
94 }
95 }
96
getErrorString(void)97 static inline std::string getErrorString(void) {
98 std::string ret;
99 char* win_msg = NULL;
100 DWORD err = GetLastError();
101 FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER |
102 FORMAT_MESSAGE_FROM_SYSTEM |
103 FORMAT_MESSAGE_IGNORE_INSERTS,
104 NULL, err,
105 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
106 (LPTSTR)&win_msg,
107 0, NULL);
108 ret.assign(win_msg);
109 LocalFree(win_msg);
110 return ret;
111 }
112
SeekFile(file_handle_t fd, const std::string &fname, uint64_t offset, bool end)113 static int64_t SeekFile(file_handle_t fd, const std::string &fname,
114 uint64_t offset, bool end)
115 {
116 LARGE_INTEGER li;
117 li.QuadPart = offset;
118
119 if (end) {
120 li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_END);
121 } else {
122 li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_BEGIN);
123 }
124
125 if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) {
126 std::stringstream ss;
127 ss << "FATAL: SetFilePointer failed " << fname << ": " <<
128 getErrorString();
129 LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
130 li.QuadPart = -1;
131 }
132
133 return li.QuadPart;
134 }
135
OpenFile(const std::string &fname, std::string &error, bool rdonly)136 file_handle_t OpenFile(const std::string &fname, std::string &error,
137 bool rdonly) {
138 file_handle_t fd;
139 if (rdonly) {
140 fd = CreateFile(const_cast<char*>(fname.c_str()),
141 GENERIC_READ,
142 FILE_SHARE_READ | FILE_SHARE_WRITE,
143 NULL,
144 OPEN_EXISTING,
145 FILE_ATTRIBUTE_NORMAL,
146 NULL);
147 } else {
148 fd = CreateFile(const_cast<char*>(fname.c_str()),
149 GENERIC_READ | GENERIC_WRITE,
150 FILE_SHARE_READ | FILE_SHARE_WRITE,
151 NULL,
152 OPEN_ALWAYS,
153 FILE_ATTRIBUTE_NORMAL,
154 NULL);
155 }
156
157 if (fd == INVALID_FILE_VALUE) {
158 error.assign(getErrorString());
159 }
160
161 return fd;
162 }
163
getFileSize(file_handle_t fd)164 int64_t getFileSize(file_handle_t fd) {
165 LARGE_INTEGER li;
166 if (GetFileSizeEx(fd, &li)) {
167 return li.QuadPart;
168 }
169 abort();
170 }
171
172 #else
173
doWrite(file_handle_t fd, const uint8_t *buf, size_t nbytes)174 static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf,
175 size_t nbytes) {
176 ssize_t ret;
177 while ((ret = write(fd, buf, nbytes)) == -1 && (errno == EINTR)) {
178 /* Retry */
179 }
180 return ret;
181 }
182
doClose(file_handle_t fd)183 static inline int doClose(file_handle_t fd) {
184 int ret;
185 while ((ret = close(fd)) == -1 && (errno == EINTR)) {
186 /* Retry */
187 }
188 return ret;
189 }
190
doFsync(file_handle_t fd)191 static inline int doFsync(file_handle_t fd) {
192 int ret;
193 while ((ret = fsync(fd)) == -1 && (errno == EINTR)) {
194 /* Retry */
195 }
196 return ret;
197 }
198
SeekFile(file_handle_t fd, const std::string &fname, uint64_t offset, bool end)199 static int64_t SeekFile(file_handle_t fd, const std::string &fname,
200 uint64_t offset, bool end)
201 {
202 int64_t ret;
203 if (end) {
204 ret = lseek(fd, offset, SEEK_END);
205 } else {
206 ret = lseek(fd, offset, SEEK_SET);
207 }
208
209 if (ret < 0) {
210 LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
211 fname.c_str(),
212 strerror(errno));
213 }
214 return ret;
215 }
216
OpenFile(const std::string &fname, std::string &error, bool rdonly)217 file_handle_t OpenFile(const std::string &fname, std::string &error,
218 bool rdonly) {
219 file_handle_t fd;
220 if (rdonly) {
221 fd = ::open(const_cast<char*>(fname.c_str()), O_RDONLY);
222 } else {
223 fd = ::open(const_cast<char*>(fname.c_str()), O_RDWR | O_CREAT, 0666);
224 }
225
226 if (fd < 0) {
227 error.assign(strerror(errno));
228 }
229
230 return fd;
231 }
232
getFileSize(file_handle_t fd)233 int64_t getFileSize(file_handle_t fd) {
234 struct stat st;
235 int stat_result = fstat(fd, &st);
236 cb_assert(stat_result == 0);
237 return st.st_size;
238 }
239 #endif
240
241
writeFully(file_handle_t fd, const uint8_t *buf, size_t nbytes)242 static void writeFully(file_handle_t fd, const uint8_t *buf, size_t nbytes) {
243 while (nbytes > 0) {
244 ssize_t written = doWrite(fd, buf, nbytes);
245 cb_assert(written >= 0);
246
247 nbytes -= written;
248 buf += written;
249 }
250 }
251
rowid() const252 uint64_t MutationLogEntry::rowid() const {
253 return ntohll(_rowid);
254 }
255
MutationLog(const std::string &path, const size_t bs)256 MutationLog::MutationLog(const std::string &path,
257 const size_t bs)
258 : paddingHisto(GrowingWidthGenerator<uint32_t>(0, 8, 1.5), 32),
259 logPath(path),
260 blockSize(bs),
261 blockPos(HEADER_RESERVED),
262 file(INVALID_FILE_VALUE),
263 disabled(false),
264 entries(0),
265 entryBuffer(static_cast<uint8_t*>(calloc(MutationLogEntry::len(256), 1))),
266 blockBuffer(static_cast<uint8_t*>(calloc(bs, 1))),
267 syncConfig(DEFAULT_SYNC_CONF),
268 readOnly(false)
269 {
270 for (int ii = 0; ii < MUTATION_LOG_TYPES; ++ii) {
271 itemsLogged[ii].store(0);
272 }
273 logSize.store(0);
274
275 cb_assert(entryBuffer);
276 cb_assert(blockBuffer);
277 if (logPath == "") {
278 disabled = true;
279 }
280 }
281
~MutationLog()282 MutationLog::~MutationLog() {
283 flush();
284 close();
285 free(entryBuffer);
286 free(blockBuffer);
287 }
288
disable()289 void MutationLog::disable() {
290 if (file >= 0) {
291 close();
292 disabled = true;
293 }
294 }
295
newItem(uint16_t vbucket, const std::string &key, uint64_t rowid)296 void MutationLog::newItem(uint16_t vbucket, const std::string &key,
297 uint64_t rowid) {
298 if (isEnabled()) {
299 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
300 rowid, ML_NEW,
301 vbucket, key);
302 writeEntry(mle);
303 }
304 }
305
delItem(uint16_t vbucket, const std::string &key)306 void MutationLog::delItem(uint16_t vbucket, const std::string &key) {
307 if (isEnabled()) {
308 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
309 0, ML_DEL, vbucket,
310 key);
311 writeEntry(mle);
312 }
313 }
314
deleteAll(uint16_t vbucket)315 void MutationLog::deleteAll(uint16_t vbucket) {
316 if (isEnabled()) {
317 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
318 0, ML_DEL_ALL,
319 vbucket, "");
320 writeEntry(mle);
321 }
322 }
323
sync()324 void MutationLog::sync() {
325 cb_assert(isOpen());
326 BlockTimer timer(&syncTimeHisto);
327 int fsyncResult = doFsync(file);
328 cb_assert(fsyncResult != -1);
329 }
330
commit1()331 void MutationLog::commit1() {
332 if (isEnabled()) {
333 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
334 0, ML_COMMIT1, 0,
335 "");
336 writeEntry(mle);
337 if ((getSyncConfig() & FLUSH_COMMIT_1) != 0) {
338 flush();
339 }
340 if ((getSyncConfig() & SYNC_COMMIT_1) != 0) {
341 sync();
342 }
343 }
344 }
345
commit2()346 void MutationLog::commit2() {
347 if (isEnabled()) {
348 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer,
349 0, ML_COMMIT2, 0,
350 "");
351 writeEntry(mle);
352 if ((getSyncConfig() & FLUSH_COMMIT_2) != 0) {
353 flush();
354 }
355 if ((getSyncConfig() & SYNC_COMMIT_2) != 0) {
356 sync();
357 }
358 }
359 }
360
writeInitialBlock()361 bool MutationLog::writeInitialBlock() {
362 cb_assert(!readOnly);
363 cb_assert(isEnabled());
364 cb_assert(isOpen());
365 headerBlock.set(blockSize);
366
367 writeFully(file, (uint8_t*)&headerBlock, sizeof(headerBlock));
368
369 int64_t seek_result = SeekFile(file, getLogFile(),
370 std::max(
371 static_cast<uint32_t>(MIN_LOG_HEADER_SIZE),
372 headerBlock.blockSize() * headerBlock.blockCount())
373 - 1, false);
374 if (seek_result < 0) {
375 LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
376 getLogFile().c_str(), strerror(errno));
377 return false;
378 }
379 uint8_t zero(0);
380 writeFully(file, &zero, sizeof(zero));
381 return true;
382 }
383
readInitialBlock()384 void MutationLog::readInitialBlock() {
385 cb_assert(isOpen());
386 uint8_t buf[MIN_LOG_HEADER_SIZE];
387 ssize_t bytesread = pread(file, buf, sizeof(buf), 0);
388
389 if (bytesread != sizeof(buf)) {
390 LOG(EXTENSION_LOG_WARNING, "FATAL: initial block read failed"
391 "'%s': %s", getLogFile().c_str(), strerror(errno));
392 throw ShortReadException();
393 }
394
395 headerBlock.set(buf, sizeof(buf));
396
397 // These are reserved for future use.
398 if (headerBlock.version() != LOG_VERSION ||
399 headerBlock.blockCount() != 1) {
400 std::stringstream ss;
401 ss << "HeaderBlock version/blockCount mismatch";
402 throw ReadException(ss.str());
403 }
404
405 blockSize = headerBlock.blockSize();
406 }
407
updateInitialBlock()408 void MutationLog::updateInitialBlock() {
409 cb_assert(!readOnly);
410 cb_assert(isOpen());
411 needWriteAccess();
412
413 uint8_t buf[MIN_LOG_HEADER_SIZE];
414 memset(buf, 0, sizeof(buf));
415 memcpy(buf, (uint8_t*)&headerBlock, sizeof(headerBlock));
416
417 ssize_t byteswritten = pwrite(file, buf, sizeof(buf), 0);
418
419 // @todo we need a write exception
420 if (byteswritten != sizeof(buf)) {
421 throw WriteException("Failed to update header block");
422 }
423 }
424
prepareWrites()425 bool MutationLog::prepareWrites() {
426 if (isEnabled()) {
427 cb_assert(isOpen());
428 int64_t seek_result = SeekFile(file, getLogFile(), 0, true);
429 if (seek_result < 0) {
430 return false;
431 }
432 int64_t unaligned_bytes = seek_result % blockSize;
433 if (unaligned_bytes != 0) {
434 LOG(EXTENSION_LOG_WARNING,
435 "WARNING: filesize %d not block aligned", seek_result,
436 "'%s': %s", getLogFile().c_str(), strerror(errno));
437 if (blockSize < (size_t)seek_result) {
438 if (SeekFile(file, getLogFile(),
439 seek_result - unaligned_bytes, false) < 0) {
440 LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s",
441 getLogFile().c_str(), strerror(errno));
442 return false;
443 }
444 } else {
445 throw ShortReadException();
446 }
447 }
448 logSize = static_cast<size_t>(seek_result);
449 }
450 return true;
451 }
452
parseConfigString(const std::string &s)453 static uint8_t parseConfigString(const std::string &s) {
454 uint8_t rv(0);
455 if (s == "off") {
456 rv = 0;
457 } else if (s == "commit1") {
458 rv = 1;
459 } else if (s == "commit2") {
460 rv = 2;
461 } else if (s == "full") {
462 rv = 3;
463 } else {
464 rv = 0xff;
465 }
466 return rv;
467 }
468
setSyncConfig(const std::string &s)469 bool MutationLog::setSyncConfig(const std::string &s) {
470 uint8_t v(parseConfigString(s));
471 if (v != 0xff) {
472 syncConfig = (syncConfig & ~SYNC_FULL) | v;
473 }
474 return v != 0xff;
475 }
476
setFlushConfig(const std::string &s)477 bool MutationLog::setFlushConfig(const std::string &s) {
478 uint8_t v(parseConfigString(s));
479 if (v != 0xff) {
480 syncConfig = (syncConfig & ~FLUSH_FULL) | (v << 2);
481 }
482 return v != 0xff;
483 }
484
exists() const485 bool MutationLog::exists() const {
486 return access(logPath.c_str(), F_OK) == 0;
487 }
488
open(bool _readOnly)489 void MutationLog::open(bool _readOnly) {
490 if (!isEnabled()) {
491 return;
492 }
493 readOnly = _readOnly;
494 std::string error;
495 if (readOnly) {
496 if (!exists()) {
497 throw FileNotFoundException(logPath);
498 }
499 file = OpenFile(logPath, error, true);
500 } else {
501 file = OpenFile(logPath, error, false);
502 }
503
504 if (file == INVALID_FILE_VALUE) {
505 std::stringstream ss;
506 ss << "Unable to open log file: " << error; // strerror(errno);
507 throw ReadException(ss.str());
508 }
509
510 int64_t size = getFileSize(file);
511 if (size && size < static_cast<int64_t>(MIN_LOG_HEADER_SIZE)) {
512 try {
513 LOG(EXTENSION_LOG_WARNING, "WARNING: Corrupted access log '%s'",
514 getLogFile().c_str());
515 reset();
516 return;
517 } catch (ShortReadException &) {
518 close();
519 disabled = true;
520 throw ShortReadException();
521 }
522 }
523 if (size == 0) {
524 if (!writeInitialBlock()) {
525 close();
526 disabled = true;
527 return;
528 }
529 } else {
530 try {
531 readInitialBlock();
532 } catch (ShortReadException &) {
533 close();
534 disabled = true;
535 throw ShortReadException();
536 }
537
538 if (!readOnly) {
539 headerBlock.setRdwr(1);
540 updateInitialBlock();
541 }
542 }
543
544 if (!prepareWrites()) {
545 close();
546 disabled = true;
547 return;
548 }
549
550 cb_assert(isOpen());
551 }
552
close()553 void MutationLog::close() {
554 if (!isEnabled() || !isOpen()) {
555 return;
556 }
557
558 if (!readOnly) {
559 flush();
560 sync();
561 headerBlock.setRdwr(0);
562 updateInitialBlock();
563 }
564
565 int close_result = doClose(file);
566 cb_assert(close_result != -1);
567 file = INVALID_FILE_VALUE;
568 }
569
reset()570 bool MutationLog::reset() {
571 if (!isEnabled()) {
572 return false;
573 }
574 close();
575
576 if (remove(getLogFile().c_str()) == -1) {
577 LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
578 getLogFile().c_str(), strerror(errno));
579 return false;
580 }
581
582 open();
583 LOG(EXTENSION_LOG_INFO, "Reset a mutation log '%s' successfully.",
584 getLogFile().c_str());
585 return true;
586 }
587
replaceWith(MutationLog &mlog)588 bool MutationLog::replaceWith(MutationLog &mlog) {
589 cb_assert(mlog.isEnabled());
590 cb_assert(isEnabled());
591
592 mlog.flush();
593 mlog.close();
594 flush();
595 close();
596
597 for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
598 itemsLogged[i].store(mlog.itemsLogged[i]);
599 }
600
601 if (rename(mlog.getLogFile().c_str(), getLogFile().c_str()) != 0) {
602 open();
603 std::stringstream ss;
604 ss <<
605 "Unable to rename a mutation log \"" << mlog.getLogFile() << "\" "
606 << "to \"" << getLogFile() << "\": " << strerror(errno);
607 LOG(EXTENSION_LOG_WARNING, "%s!!! Reopened the old log file",
608 ss.str().c_str());
609 return false;
610 }
611
612 open();
613 LOG(EXTENSION_LOG_INFO,
614 "Renamed a mutation log \"%s\" to \"%s\" and reopened it",
615 mlog.getLogFile().c_str(), getLogFile().c_str());
616 return true;
617 }
618
flush()619 void MutationLog::flush() {
620 if (isEnabled() && blockPos > HEADER_RESERVED) {
621 cb_assert(isOpen());
622 needWriteAccess();
623 BlockTimer timer(&flushTimeHisto);
624
625 if (blockPos < blockSize) {
626 size_t padding(blockSize - blockPos);
627 memset(blockBuffer + blockPos, 0x00, padding);
628 paddingHisto.add(padding);
629 }
630
631 entries = htons(entries);
632 memcpy(blockBuffer + 2, &entries, sizeof(entries));
633
634 uint32_t crc32(crc32buf(blockBuffer + 2, blockSize - 2));
635 uint16_t crc16(htons(crc32 & 0xffff));
636 memcpy(blockBuffer, &crc16, sizeof(crc16));
637
638 writeFully(file, blockBuffer, blockSize);
639 logSize.fetch_add(blockSize);
640
641 blockPos = HEADER_RESERVED;
642 entries = 0;
643 }
644 }
645
writeEntry(MutationLogEntry *mle)646 void MutationLog::writeEntry(MutationLogEntry *mle) {
647 cb_assert(isEnabled());
648 cb_assert(isOpen());
649 needWriteAccess();
650
651 size_t len(mle->len());
652 if (blockPos + len > blockSize) {
653 flush();
654 }
655 cb_assert(len < blockSize);
656
657 memcpy(blockBuffer + blockPos, mle, len);
658 blockPos += len;
659 ++entries;
660
661 ++itemsLogged[mle->type()];
662
663 delete mle;
664 }
665
logType(uint8_t t)666 static const char* logType(uint8_t t) {
667 switch(t) {
668 case ML_NEW:
669 return "new";
670 break;
671 case ML_DEL:
672 return "del";
673 break;
674 case ML_DEL_ALL:
675 return "delall";
676 break;
677 case ML_COMMIT1:
678 return "commit1";
679 break;
680 case ML_COMMIT2:
681 return "commit2";
682 break;
683 }
684 return "UNKNOWN";
685 }
686
687 // ----------------------------------------------------------------------
688 // Mutation log iterator
689 // ----------------------------------------------------------------------
690
iterator(const MutationLog *l, bool e)691 MutationLog::iterator::iterator(const MutationLog *l, bool e)
692 : log(l),
693 entryBuf(NULL),
694 buf(NULL),
695 p(buf),
696 offset(l->header().blockSize() * l->header().blockCount()),
697 items(0),
698 isEnd(e)
699 {
700 cb_assert(log);
701 }
702
iterator(const MutationLog::iterator& mit)703 MutationLog::iterator::iterator(const MutationLog::iterator& mit)
704 : log(mit.log),
705 entryBuf(NULL),
706 buf(NULL),
707 p(NULL),
708 offset(mit.offset),
709 items(mit.items),
710 isEnd(mit.isEnd)
711 {
712 cb_assert(log);
713 if (mit.buf != NULL) {
714 buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize()));
715 cb_assert(buf);
716 memcpy(buf, mit.buf, log->header().blockSize());
717 p = buf + (mit.p - mit.buf);
718 }
719
720 if (mit.entryBuf != NULL) {
721 entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE));
722 cb_assert(entryBuf);
723 memcpy(entryBuf, mit.entryBuf, LOG_ENTRY_BUF_SIZE);
724 }
725 }
726
~iterator()727 MutationLog::iterator::~iterator() {
728 free(entryBuf);
729 free(buf);
730 }
731
prepItem()732 void MutationLog::iterator::prepItem() {
733 MutationLogEntry *e = MutationLogEntry::newEntry(p,
734 bufferBytesRemaining());
735 if (entryBuf == NULL) {
736 entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE));
737 cb_assert(entryBuf);
738 }
739 memcpy(entryBuf, p, e->len());
740 }
741
operator ++()742 MutationLog::iterator& MutationLog::iterator::operator++() {
743 if (--items == 0) {
744 nextBlock();
745 } else {
746 size_t l(operator*()->len());
747 p += l;
748
749 prepItem();
750 }
751 return *this;
752 }
753
operator ++(int)754 MutationLog::iterator& MutationLog::iterator::operator++(int) {
755 abort();
756 return *this;
757 }
758
operator ==(const MutationLog::iterator& rhs)759 bool MutationLog::iterator::operator==(const MutationLog::iterator& rhs) {
760 return log->fd() == rhs.log->fd()
761 && (
762 (isEnd == rhs.isEnd)
763 || (offset == rhs.offset
764 && items == rhs.items));
765 }
766
operator !=(const MutationLog::iterator& rhs)767 bool MutationLog::iterator::operator!=(const MutationLog::iterator& rhs) {
768 return ! operator==(rhs);
769 }
770
operator *()771 const MutationLogEntry* MutationLog::iterator::operator*() {
772 cb_assert(entryBuf != NULL);
773 return MutationLogEntry::newEntry(entryBuf, LOG_ENTRY_BUF_SIZE);
774 }
775
bufferBytesRemaining()776 size_t MutationLog::iterator::bufferBytesRemaining() {
777 return log->header().blockSize() - (p - buf);
778 }
779
nextBlock()780 void MutationLog::iterator::nextBlock() {
781 cb_assert(!log->isEnabled() || log->isOpen());
782 if (buf == NULL) {
783 buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize()));
784 cb_assert(buf);
785 }
786 p = buf;
787
788 ssize_t bytesread = pread(log->fd(), buf, log->header().blockSize(),
789 offset);
790 if (bytesread < 1) {
791 isEnd = true;
792 return;
793 }
794 if (bytesread != (ssize_t)(log->header().blockSize())) {
795 LOG(EXTENSION_LOG_WARNING, "FATAL: too few bytes read in access log"
796 "'%s': %s", log->getLogFile().c_str(), strerror(errno));
797 throw ShortReadException();
798 }
799 offset += bytesread;
800
801 uint32_t crc32(crc32buf(buf + 2, log->header().blockSize() - 2));
802 uint16_t computed_crc16(crc32 & 0xffff);
803 uint16_t retrieved_crc16;
804 memcpy(&retrieved_crc16, buf, sizeof(retrieved_crc16));
805 retrieved_crc16 = ntohs(retrieved_crc16);
806 if (computed_crc16 != retrieved_crc16) {
807 throw CRCReadException();
808 }
809
810 memcpy(&items, buf + 2, 2);
811 items = ntohs(items);
812
813 p = p + 4;
814
815 prepItem();
816 }
817
resetCounts(size_t *items)818 void MutationLog::resetCounts(size_t *items) {
819 for (int i(0); i < MUTATION_LOG_TYPES; ++i) {
820 itemsLogged[i] = items[i];
821 }
822 }
823
824 // ----------------------------------------------------------------------
825 // Reading entries
826 // ----------------------------------------------------------------------
827
load()828 bool MutationLogHarvester::load() {
829 bool clean(false);
830 std::set<uint16_t> shouldClear;
831 for (MutationLog::iterator it(mlog.begin()); it != mlog.end(); ++it) {
832 const MutationLogEntry *le = *it;
833 ++itemsSeen[le->type()];
834 clean = false;
835
836 switch (le->type()) {
837 case ML_DEL:
838 // FALLTHROUGH
839 case ML_NEW:
840 if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
841 loading[le->vbucket()][le->key()] =
842 std::make_pair(le->rowid(), le->type());
843 }
844 break;
845 case ML_COMMIT2: {
846 clean = true;
847 for (std::set<uint16_t>::iterator vit(shouldClear.begin());
848 vit != shouldClear.end(); ++vit) {
849 committed[*vit].clear();
850 }
851 shouldClear.clear();
852
853 for (std::set<uint16_t>::const_iterator vit = vbid_set.begin();
854 vit != vbid_set.end(); ++vit) {
855 uint16_t vb(*vit);
856
857 unordered_map<std::string, mutation_log_event_t>::iterator
858 copyit2;
859 for (copyit2 = loading[vb].begin();
860 copyit2 != loading[vb].end();
861 ++copyit2) {
862
863 mutation_log_event_t t = copyit2->second;
864
865 switch (t.second) {
866 case ML_NEW:
867 committed[vb][copyit2->first] = t.first;
868 break;
869 case ML_DEL:
870 committed[vb].erase(copyit2->first);
871 break;
872 default:
873 abort();
874 }
875 }
876 }
877 }
878 loading.clear();
879 break;
880 case ML_COMMIT1:
881 // nothing in particular
882 break;
883 case ML_DEL_ALL:
884 if (vbid_set.find(le->vbucket()) != vbid_set.end()) {
885 loading[le->vbucket()].clear();
886 shouldClear.insert(le->vbucket());
887 }
888 break;
889 default:
890 abort();
891 }
892 }
893 return clean;
894 }
895
apply(void *arg, mlCallback mlc)896 void MutationLogHarvester::apply(void *arg, mlCallback mlc) {
897 for (std::set<uint16_t>::const_iterator it = vbid_set.begin();
898 it != vbid_set.end(); ++it) {
899 uint16_t vb(*it);
900
901 for (unordered_map<std::string, uint64_t>::iterator it2 =
902 committed[vb].begin();
903 it2 != committed[vb].end(); ++it2) {
904 const std::string key(it2->first);
905 uint64_t rowid(it2->second);
906 if (!mlc(arg, vb, key, rowid)) { // Stop loading from an access log
907 return;
908 }
909 }
910 }
911 }
912
apply(void *arg, mlCallbackWithQueue mlc)913 void MutationLogHarvester::apply(void *arg, mlCallbackWithQueue mlc) {
914 cb_assert(engine);
915 std::vector<std::pair<std::string, uint64_t> > fetches;
916 std::set<uint16_t>::const_iterator it = vbid_set.begin();
917 for (; it != vbid_set.end(); ++it) {
918 uint16_t vb(*it);
919 RCPtr<VBucket> vbucket = engine->getEpStore()->getVBucket(vb);
920 if (!vbucket) {
921 continue;
922 }
923 unordered_map<std::string, uint64_t>::iterator it2 =
924 committed[vb].begin();
925 for (; it2 != committed[vb].end(); ++it2) {
926 // cannot use rowid from access log, so must read from hashtable
927 std::string key = it2->first;
928 StoredValue *v = NULL;
929 if ((v = vbucket->ht.find(key, false))) {
930 fetches.push_back(std::make_pair(it2->first, v->getBySeqno()));
931 }
932 }
933 if (!mlc(vb, fetches, arg)) {
934 return;
935 }
936 fetches.clear();
937 }
938 }
939
getUncommitted( std::vector<mutation_log_uncommitted_t> &uitems)940 void MutationLogHarvester::getUncommitted(
941 std::vector<mutation_log_uncommitted_t> &uitems) {
942
943 for (std::set<uint16_t>::const_iterator vit = vbid_set.begin();
944 vit != vbid_set.end(); ++vit) {
945 uint16_t vb(*vit);
946 mutation_log_uncommitted_t leftover;
947 leftover.vbucket = vb;
948
949 unordered_map<std::string, mutation_log_event_t>::iterator copyit2;
950 for (copyit2 = loading[vb].begin();
951 copyit2 != loading[vb].end();
952 ++copyit2) {
953
954 mutation_log_event_t t = copyit2->second;
955 leftover.key = copyit2->first;
956 leftover.rowid = t.first;
957 leftover.type = static_cast<mutation_log_type_t>(t.second);
958
959 uitems.push_back(leftover);
960 }
961 }
962 }
963
total()964 size_t MutationLogHarvester::total() {
965 size_t rv(0);
966 for (int i = 0; i < MUTATION_LOG_TYPES; ++i) {
967 rv += itemsSeen[i];
968 }
969 return rv;
970 }
971
972 // ----------------------------------------------------------------------
973 // Output of entries
974 // ----------------------------------------------------------------------
975
operator <<(std::ostream &out, const MutationLogEntry &mle)976 std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle) {
977 out << "{MutationLogEntry rowid=" << mle.rowid()
978 << ", vbucket=" << mle.vbucket()
979 << ", magic=0x" << std::hex << static_cast<uint16_t>(mle.magic)
980 << std::dec
981 << ", type=" << logType(mle.type())
982 << ", key=``" << mle.key() << "''";
983 return out;
984 }
985