1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */ 2/* 3 * Copyright 2011 Couchbase, Inc 4 * 5 * Licensed under the Apache License, Version 2.0 (the "License"); 6 * you may not use this file except in compliance with the License. 7 * You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18#include "config.h" 19 20#include <sys/stat.h> 21 22#include <algorithm> 23#include <string> 24#include <utility> 25 26extern "C" { 27#include "crc32.h" 28} 29#include "ep_engine.h" 30#include "mutation_log.h" 31 32const char *mutation_log_type_names[] = { 33 "new", "del", "del_all", "commit1", "commit2", NULL 34}; 35 36 37#ifdef WIN32 38ssize_t pread(file_handle_t fd, void *buf, size_t nbyte, uint64_t offset) 39{ 40 DWORD bytesread; 41 OVERLAPPED winoffs; 42 memset(&winoffs, 0, sizeof(winoffs)); 43 winoffs.Offset = offset & 0xFFFFFFFF; 44 winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF; 45 if (!ReadFile(fd, buf, nbyte, &bytesread, &winoffs)) { 46 /* luckily we don't check errno so we don't need to care about that */ 47 return -1; 48 } 49 50 return bytesread; 51} 52 53ssize_t pwrite(file_handle_t fd, const void *buf, size_t nbyte, 54 uint64_t offset) 55{ 56 DWORD byteswritten; 57 OVERLAPPED winoffs; 58 memset(&winoffs, 0, sizeof(winoffs)); 59 winoffs.Offset = offset & 0xFFFFFFFF; 60 winoffs.OffsetHigh = (offset >> 32) & 0x7FFFFFFF; 61 if (!WriteFile(fd, buf, nbyte, &byteswritten, &winoffs)) { 62 /* luckily we don't check errno so we don't need to care about that */ 63 return -1; 64 } 65 66 return byteswritten; 67} 68 69static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf, 70 size_t nbytes) { 71 DWORD byteswritten; 72 if (!WriteFile(fd, buf, nbytes, &byteswritten, NULL)) { 73 /* luckily we don't check errno so we don't need to care about that */ 74 return -1; 75 } 76 77 cb_assert(GetLastError() != ERROR_IO_PENDING); 78 return byteswritten; 79} 80 81static inline int doClose(file_handle_t fd) { 82 if (CloseHandle(fd)) { 83 return 0; 84 } else { 85 return -1; 86 } 87} 88 89static inline int doFsync(file_handle_t fd) { 90 if (FlushFileBuffers(fd)) { 91 return 0; 92 } else { 93 return -1; 94 } 95} 96 97static inline std::string getErrorString(void) { 98 std::string ret; 99 char* win_msg = NULL; 100 DWORD err = GetLastError(); 101 FormatMessageA(FORMAT_MESSAGE_ALLOCATE_BUFFER | 102 FORMAT_MESSAGE_FROM_SYSTEM | 103 FORMAT_MESSAGE_IGNORE_INSERTS, 104 NULL, err, 105 MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), 106 (LPTSTR)&win_msg, 107 0, NULL); 108 ret.assign(win_msg); 109 LocalFree(win_msg); 110 return ret; 111} 112 113static int64_t SeekFile(file_handle_t fd, const std::string &fname, 114 uint64_t offset, bool end) 115{ 116 LARGE_INTEGER li; 117 li.QuadPart = offset; 118 119 if (end) { 120 li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_END); 121 } else { 122 li.LowPart = SetFilePointer(fd, li.LowPart, &li.HighPart, FILE_BEGIN); 123 } 124 125 if (li.LowPart == INVALID_SET_FILE_POINTER && GetLastError() != NO_ERROR) { 126 std::stringstream ss; 127 ss << "FATAL: SetFilePointer failed " << fname << ": " << 128 getErrorString(); 129 LOG(EXTENSION_LOG_WARNING, ss.str().c_str()); 130 li.QuadPart = -1; 131 } 132 133 return li.QuadPart; 134} 135 136file_handle_t OpenFile(const std::string &fname, std::string &error, 137 bool rdonly) { 138 file_handle_t fd; 139 if (rdonly) { 140 fd = CreateFile(const_cast<char*>(fname.c_str()), 141 GENERIC_READ, 142 FILE_SHARE_READ | FILE_SHARE_WRITE, 143 NULL, 144 OPEN_EXISTING, 145 FILE_ATTRIBUTE_NORMAL, 146 NULL); 147 } else { 148 fd = CreateFile(const_cast<char*>(fname.c_str()), 149 GENERIC_READ | GENERIC_WRITE, 150 FILE_SHARE_READ | FILE_SHARE_WRITE, 151 NULL, 152 OPEN_ALWAYS, 153 FILE_ATTRIBUTE_NORMAL, 154 NULL); 155 } 156 157 if (fd == INVALID_FILE_VALUE) { 158 error.assign(getErrorString()); 159 } 160 161 return fd; 162} 163 164int64_t getFileSize(file_handle_t fd) { 165 LARGE_INTEGER li; 166 if (GetFileSizeEx(fd, &li)) { 167 return li.QuadPart; 168 } 169 abort(); 170} 171 172#else 173 174static inline ssize_t doWrite(file_handle_t fd, const uint8_t *buf, 175 size_t nbytes) { 176 ssize_t ret; 177 while ((ret = write(fd, buf, nbytes)) == -1 && (errno == EINTR)) { 178 /* Retry */ 179 } 180 return ret; 181} 182 183static inline int doClose(file_handle_t fd) { 184 int ret; 185 while ((ret = close(fd)) == -1 && (errno == EINTR)) { 186 /* Retry */ 187 } 188 return ret; 189} 190 191static inline int doFsync(file_handle_t fd) { 192 int ret; 193 while ((ret = fsync(fd)) == -1 && (errno == EINTR)) { 194 /* Retry */ 195 } 196 return ret; 197} 198 199static int64_t SeekFile(file_handle_t fd, const std::string &fname, 200 uint64_t offset, bool end) 201{ 202 int64_t ret; 203 if (end) { 204 ret = lseek(fd, offset, SEEK_END); 205 } else { 206 ret = lseek(fd, offset, SEEK_SET); 207 } 208 209 if (ret < 0) { 210 LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s", 211 fname.c_str(), 212 strerror(errno)); 213 } 214 return ret; 215} 216 217file_handle_t OpenFile(const std::string &fname, std::string &error, 218 bool rdonly) { 219 file_handle_t fd; 220 if (rdonly) { 221 fd = ::open(const_cast<char*>(fname.c_str()), O_RDONLY); 222 } else { 223 fd = ::open(const_cast<char*>(fname.c_str()), O_RDWR | O_CREAT, 0666); 224 } 225 226 if (fd < 0) { 227 error.assign(strerror(errno)); 228 } 229 230 return fd; 231} 232 233int64_t getFileSize(file_handle_t fd) { 234 struct stat st; 235 int stat_result = fstat(fd, &st); 236 cb_assert(stat_result == 0); 237 return st.st_size; 238} 239#endif 240 241 242static void writeFully(file_handle_t fd, const uint8_t *buf, size_t nbytes) { 243 while (nbytes > 0) { 244 ssize_t written = doWrite(fd, buf, nbytes); 245 cb_assert(written >= 0); 246 247 nbytes -= written; 248 buf += written; 249 } 250} 251 252uint64_t MutationLogEntry::rowid() const { 253 return ntohll(_rowid); 254} 255 256MutationLog::MutationLog(const std::string &path, 257 const size_t bs) 258 : paddingHisto(GrowingWidthGenerator<uint32_t>(0, 8, 1.5), 32), 259 logPath(path), 260 blockSize(bs), 261 blockPos(HEADER_RESERVED), 262 file(INVALID_FILE_VALUE), 263 disabled(false), 264 entries(0), 265 entryBuffer(static_cast<uint8_t*>(calloc(MutationLogEntry::len(256), 1))), 266 blockBuffer(static_cast<uint8_t*>(calloc(bs, 1))), 267 syncConfig(DEFAULT_SYNC_CONF), 268 readOnly(false) 269{ 270 for (int ii = 0; ii < MUTATION_LOG_TYPES; ++ii) { 271 itemsLogged[ii].store(0); 272 } 273 logSize.store(0); 274 275 cb_assert(entryBuffer); 276 cb_assert(blockBuffer); 277 if (logPath == "") { 278 disabled = true; 279 } 280} 281 282MutationLog::~MutationLog() { 283 flush(); 284 close(); 285 free(entryBuffer); 286 free(blockBuffer); 287} 288 289void MutationLog::disable() { 290 if (file >= 0) { 291 close(); 292 disabled = true; 293 } 294} 295 296void MutationLog::newItem(uint16_t vbucket, const std::string &key, 297 uint64_t rowid) { 298 if (isEnabled()) { 299 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, 300 rowid, ML_NEW, 301 vbucket, key); 302 writeEntry(mle); 303 } 304} 305 306void MutationLog::delItem(uint16_t vbucket, const std::string &key) { 307 if (isEnabled()) { 308 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, 309 0, ML_DEL, vbucket, 310 key); 311 writeEntry(mle); 312 } 313} 314 315void MutationLog::deleteAll(uint16_t vbucket) { 316 if (isEnabled()) { 317 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, 318 0, ML_DEL_ALL, 319 vbucket, ""); 320 writeEntry(mle); 321 } 322} 323 324void MutationLog::sync() { 325 cb_assert(isOpen()); 326 BlockTimer timer(&syncTimeHisto); 327 int fsyncResult = doFsync(file); 328 cb_assert(fsyncResult != -1); 329} 330 331void MutationLog::commit1() { 332 if (isEnabled()) { 333 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, 334 0, ML_COMMIT1, 0, 335 ""); 336 writeEntry(mle); 337 if ((getSyncConfig() & FLUSH_COMMIT_1) != 0) { 338 flush(); 339 } 340 if ((getSyncConfig() & SYNC_COMMIT_1) != 0) { 341 sync(); 342 } 343 } 344} 345 346void MutationLog::commit2() { 347 if (isEnabled()) { 348 MutationLogEntry *mle = MutationLogEntry::newEntry(entryBuffer, 349 0, ML_COMMIT2, 0, 350 ""); 351 writeEntry(mle); 352 if ((getSyncConfig() & FLUSH_COMMIT_2) != 0) { 353 flush(); 354 } 355 if ((getSyncConfig() & SYNC_COMMIT_2) != 0) { 356 sync(); 357 } 358 } 359} 360 361bool MutationLog::writeInitialBlock() { 362 cb_assert(!readOnly); 363 cb_assert(isEnabled()); 364 cb_assert(isOpen()); 365 headerBlock.set(blockSize); 366 367 writeFully(file, (uint8_t*)&headerBlock, sizeof(headerBlock)); 368 369 int64_t seek_result = SeekFile(file, getLogFile(), 370 std::max( 371 static_cast<uint32_t>(MIN_LOG_HEADER_SIZE), 372 headerBlock.blockSize() * headerBlock.blockCount()) 373 - 1, false); 374 if (seek_result < 0) { 375 LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s", 376 getLogFile().c_str(), strerror(errno)); 377 return false; 378 } 379 uint8_t zero(0); 380 writeFully(file, &zero, sizeof(zero)); 381 return true; 382} 383 384void MutationLog::readInitialBlock() { 385 cb_assert(isOpen()); 386 uint8_t buf[MIN_LOG_HEADER_SIZE]; 387 ssize_t bytesread = pread(file, buf, sizeof(buf), 0); 388 389 if (bytesread != sizeof(buf)) { 390 LOG(EXTENSION_LOG_WARNING, "FATAL: initial block read failed" 391 "'%s': %s", getLogFile().c_str(), strerror(errno)); 392 throw ShortReadException(); 393 } 394 395 headerBlock.set(buf, sizeof(buf)); 396 397 // These are reserved for future use. 398 if (headerBlock.version() != LOG_VERSION || 399 headerBlock.blockCount() != 1) { 400 std::stringstream ss; 401 ss << "HeaderBlock version/blockCount mismatch"; 402 throw ReadException(ss.str()); 403 } 404 405 blockSize = headerBlock.blockSize(); 406} 407 408void MutationLog::updateInitialBlock() { 409 cb_assert(!readOnly); 410 cb_assert(isOpen()); 411 needWriteAccess(); 412 413 uint8_t buf[MIN_LOG_HEADER_SIZE]; 414 memset(buf, 0, sizeof(buf)); 415 memcpy(buf, (uint8_t*)&headerBlock, sizeof(headerBlock)); 416 417 ssize_t byteswritten = pwrite(file, buf, sizeof(buf), 0); 418 419 // @todo we need a write exception 420 if (byteswritten != sizeof(buf)) { 421 throw WriteException("Failed to update header block"); 422 } 423} 424 425bool MutationLog::prepareWrites() { 426 if (isEnabled()) { 427 cb_assert(isOpen()); 428 int64_t seek_result = SeekFile(file, getLogFile(), 0, true); 429 if (seek_result < 0) { 430 return false; 431 } 432 int64_t unaligned_bytes = seek_result % blockSize; 433 if (unaligned_bytes != 0) { 434 LOG(EXTENSION_LOG_WARNING, 435 "WARNING: filesize %d not block aligned", seek_result, 436 "'%s': %s", getLogFile().c_str(), strerror(errno)); 437 if (blockSize < (size_t)seek_result) { 438 if (SeekFile(file, getLogFile(), 439 seek_result - unaligned_bytes, false) < 0) { 440 LOG(EXTENSION_LOG_WARNING, "FATAL: lseek failed '%s': %s", 441 getLogFile().c_str(), strerror(errno)); 442 return false; 443 } 444 } else { 445 throw ShortReadException(); 446 } 447 } 448 logSize = static_cast<size_t>(seek_result); 449 } 450 return true; 451} 452 453static uint8_t parseConfigString(const std::string &s) { 454 uint8_t rv(0); 455 if (s == "off") { 456 rv = 0; 457 } else if (s == "commit1") { 458 rv = 1; 459 } else if (s == "commit2") { 460 rv = 2; 461 } else if (s == "full") { 462 rv = 3; 463 } else { 464 rv = 0xff; 465 } 466 return rv; 467} 468 469bool MutationLog::setSyncConfig(const std::string &s) { 470 uint8_t v(parseConfigString(s)); 471 if (v != 0xff) { 472 syncConfig = (syncConfig & ~SYNC_FULL) | v; 473 } 474 return v != 0xff; 475} 476 477bool MutationLog::setFlushConfig(const std::string &s) { 478 uint8_t v(parseConfigString(s)); 479 if (v != 0xff) { 480 syncConfig = (syncConfig & ~FLUSH_FULL) | (v << 2); 481 } 482 return v != 0xff; 483} 484 485bool MutationLog::exists() const { 486 return access(logPath.c_str(), F_OK) == 0; 487} 488 489void MutationLog::open(bool _readOnly) { 490 if (!isEnabled()) { 491 return; 492 } 493 readOnly = _readOnly; 494 std::string error; 495 if (readOnly) { 496 if (!exists()) { 497 throw FileNotFoundException(logPath); 498 } 499 file = OpenFile(logPath, error, true); 500 } else { 501 file = OpenFile(logPath, error, false); 502 } 503 504 if (file == INVALID_FILE_VALUE) { 505 std::stringstream ss; 506 ss << "Unable to open log file: " << error; // strerror(errno); 507 throw ReadException(ss.str()); 508 } 509 510 int64_t size = getFileSize(file); 511 if (size && size < static_cast<int64_t>(MIN_LOG_HEADER_SIZE)) { 512 try { 513 LOG(EXTENSION_LOG_WARNING, "WARNING: Corrupted access log '%s'", 514 getLogFile().c_str()); 515 reset(); 516 return; 517 } catch (ShortReadException &) { 518 close(); 519 disabled = true; 520 throw ShortReadException(); 521 } 522 } 523 if (size == 0) { 524 if (!writeInitialBlock()) { 525 close(); 526 disabled = true; 527 return; 528 } 529 } else { 530 try { 531 readInitialBlock(); 532 } catch (ShortReadException &) { 533 close(); 534 disabled = true; 535 throw ShortReadException(); 536 } 537 538 if (!readOnly) { 539 headerBlock.setRdwr(1); 540 updateInitialBlock(); 541 } 542 } 543 544 if (!prepareWrites()) { 545 close(); 546 disabled = true; 547 return; 548 } 549 550 cb_assert(isOpen()); 551} 552 553void MutationLog::close() { 554 if (!isEnabled() || !isOpen()) { 555 return; 556 } 557 558 if (!readOnly) { 559 flush(); 560 sync(); 561 headerBlock.setRdwr(0); 562 updateInitialBlock(); 563 } 564 565 int close_result = doClose(file); 566 cb_assert(close_result != -1); 567 file = INVALID_FILE_VALUE; 568} 569 570bool MutationLog::reset() { 571 if (!isEnabled()) { 572 return false; 573 } 574 close(); 575 576 if (remove(getLogFile().c_str()) == -1) { 577 LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s", 578 getLogFile().c_str(), strerror(errno)); 579 return false; 580 } 581 582 open(); 583 LOG(EXTENSION_LOG_INFO, "Reset a mutation log '%s' successfully.", 584 getLogFile().c_str()); 585 return true; 586} 587 588bool MutationLog::replaceWith(MutationLog &mlog) { 589 cb_assert(mlog.isEnabled()); 590 cb_assert(isEnabled()); 591 592 mlog.flush(); 593 mlog.close(); 594 flush(); 595 close(); 596 597 for (int i(0); i < MUTATION_LOG_TYPES; ++i) { 598 itemsLogged[i].store(mlog.itemsLogged[i]); 599 } 600 601 if (rename(mlog.getLogFile().c_str(), getLogFile().c_str()) != 0) { 602 open(); 603 std::stringstream ss; 604 ss << 605 "Unable to rename a mutation log \"" << mlog.getLogFile() << "\" " 606 << "to \"" << getLogFile() << "\": " << strerror(errno); 607 LOG(EXTENSION_LOG_WARNING, "%s!!! Reopened the old log file", 608 ss.str().c_str()); 609 return false; 610 } 611 612 open(); 613 LOG(EXTENSION_LOG_INFO, 614 "Renamed a mutation log \"%s\" to \"%s\" and reopened it", 615 mlog.getLogFile().c_str(), getLogFile().c_str()); 616 return true; 617} 618 619void MutationLog::flush() { 620 if (isEnabled() && blockPos > HEADER_RESERVED) { 621 cb_assert(isOpen()); 622 needWriteAccess(); 623 BlockTimer timer(&flushTimeHisto); 624 625 if (blockPos < blockSize) { 626 size_t padding(blockSize - blockPos); 627 memset(blockBuffer + blockPos, 0x00, padding); 628 paddingHisto.add(padding); 629 } 630 631 entries = htons(entries); 632 memcpy(blockBuffer + 2, &entries, sizeof(entries)); 633 634 uint32_t crc32(crc32buf(blockBuffer + 2, blockSize - 2)); 635 uint16_t crc16(htons(crc32 & 0xffff)); 636 memcpy(blockBuffer, &crc16, sizeof(crc16)); 637 638 writeFully(file, blockBuffer, blockSize); 639 logSize.fetch_add(blockSize); 640 641 blockPos = HEADER_RESERVED; 642 entries = 0; 643 } 644} 645 646void MutationLog::writeEntry(MutationLogEntry *mle) { 647 cb_assert(isEnabled()); 648 cb_assert(isOpen()); 649 needWriteAccess(); 650 651 size_t len(mle->len()); 652 if (blockPos + len > blockSize) { 653 flush(); 654 } 655 cb_assert(len < blockSize); 656 657 memcpy(blockBuffer + blockPos, mle, len); 658 blockPos += len; 659 ++entries; 660 661 ++itemsLogged[mle->type()]; 662 663 delete mle; 664} 665 666static const char* logType(uint8_t t) { 667 switch(t) { 668 case ML_NEW: 669 return "new"; 670 break; 671 case ML_DEL: 672 return "del"; 673 break; 674 case ML_DEL_ALL: 675 return "delall"; 676 break; 677 case ML_COMMIT1: 678 return "commit1"; 679 break; 680 case ML_COMMIT2: 681 return "commit2"; 682 break; 683 } 684 return "UNKNOWN"; 685} 686 687// ---------------------------------------------------------------------- 688// Mutation log iterator 689// ---------------------------------------------------------------------- 690 691MutationLog::iterator::iterator(const MutationLog *l, bool e) 692 : log(l), 693 entryBuf(NULL), 694 buf(NULL), 695 p(buf), 696 offset(l->header().blockSize() * l->header().blockCount()), 697 items(0), 698 isEnd(e) 699{ 700 cb_assert(log); 701} 702 703MutationLog::iterator::iterator(const MutationLog::iterator& mit) 704 : log(mit.log), 705 entryBuf(NULL), 706 buf(NULL), 707 p(NULL), 708 offset(mit.offset), 709 items(mit.items), 710 isEnd(mit.isEnd) 711{ 712 cb_assert(log); 713 if (mit.buf != NULL) { 714 buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize())); 715 cb_assert(buf); 716 memcpy(buf, mit.buf, log->header().blockSize()); 717 p = buf + (mit.p - mit.buf); 718 } 719 720 if (mit.entryBuf != NULL) { 721 entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE)); 722 cb_assert(entryBuf); 723 memcpy(entryBuf, mit.entryBuf, LOG_ENTRY_BUF_SIZE); 724 } 725} 726 727MutationLog::iterator::~iterator() { 728 free(entryBuf); 729 free(buf); 730} 731 732void MutationLog::iterator::prepItem() { 733 MutationLogEntry *e = MutationLogEntry::newEntry(p, 734 bufferBytesRemaining()); 735 if (entryBuf == NULL) { 736 entryBuf = static_cast<uint8_t*>(calloc(1, LOG_ENTRY_BUF_SIZE)); 737 cb_assert(entryBuf); 738 } 739 memcpy(entryBuf, p, e->len()); 740} 741 742MutationLog::iterator& MutationLog::iterator::operator++() { 743 if (--items == 0) { 744 nextBlock(); 745 } else { 746 size_t l(operator*()->len()); 747 p += l; 748 749 prepItem(); 750 } 751 return *this; 752} 753 754MutationLog::iterator& MutationLog::iterator::operator++(int) { 755 abort(); 756 return *this; 757} 758 759bool MutationLog::iterator::operator==(const MutationLog::iterator& rhs) { 760 return log->fd() == rhs.log->fd() 761 && ( 762 (isEnd == rhs.isEnd) 763 || (offset == rhs.offset 764 && items == rhs.items)); 765} 766 767bool MutationLog::iterator::operator!=(const MutationLog::iterator& rhs) { 768 return ! operator==(rhs); 769} 770 771const MutationLogEntry* MutationLog::iterator::operator*() { 772 cb_assert(entryBuf != NULL); 773 return MutationLogEntry::newEntry(entryBuf, LOG_ENTRY_BUF_SIZE); 774} 775 776size_t MutationLog::iterator::bufferBytesRemaining() { 777 return log->header().blockSize() - (p - buf); 778} 779 780void MutationLog::iterator::nextBlock() { 781 cb_assert(!log->isEnabled() || log->isOpen()); 782 if (buf == NULL) { 783 buf = static_cast<uint8_t*>(calloc(1, log->header().blockSize())); 784 cb_assert(buf); 785 } 786 p = buf; 787 788 ssize_t bytesread = pread(log->fd(), buf, log->header().blockSize(), 789 offset); 790 if (bytesread < 1) { 791 isEnd = true; 792 return; 793 } 794 if (bytesread != (ssize_t)(log->header().blockSize())) { 795 LOG(EXTENSION_LOG_WARNING, "FATAL: too few bytes read in access log" 796 "'%s': %s", log->getLogFile().c_str(), strerror(errno)); 797 throw ShortReadException(); 798 } 799 offset += bytesread; 800 801 uint32_t crc32(crc32buf(buf + 2, log->header().blockSize() - 2)); 802 uint16_t computed_crc16(crc32 & 0xffff); 803 uint16_t retrieved_crc16; 804 memcpy(&retrieved_crc16, buf, sizeof(retrieved_crc16)); 805 retrieved_crc16 = ntohs(retrieved_crc16); 806 if (computed_crc16 != retrieved_crc16) { 807 throw CRCReadException(); 808 } 809 810 memcpy(&items, buf + 2, 2); 811 items = ntohs(items); 812 813 p = p + 4; 814 815 prepItem(); 816} 817 818void MutationLog::resetCounts(size_t *items) { 819 for (int i(0); i < MUTATION_LOG_TYPES; ++i) { 820 itemsLogged[i] = items[i]; 821 } 822} 823 824// ---------------------------------------------------------------------- 825// Reading entries 826// ---------------------------------------------------------------------- 827 828bool MutationLogHarvester::load() { 829 bool clean(false); 830 std::set<uint16_t> shouldClear; 831 for (MutationLog::iterator it(mlog.begin()); it != mlog.end(); ++it) { 832 const MutationLogEntry *le = *it; 833 ++itemsSeen[le->type()]; 834 clean = false; 835 836 switch (le->type()) { 837 case ML_DEL: 838 // FALLTHROUGH 839 case ML_NEW: 840 if (vbid_set.find(le->vbucket()) != vbid_set.end()) { 841 loading[le->vbucket()][le->key()] = 842 std::make_pair(le->rowid(), le->type()); 843 } 844 break; 845 case ML_COMMIT2: { 846 clean = true; 847 for (std::set<uint16_t>::iterator vit(shouldClear.begin()); 848 vit != shouldClear.end(); ++vit) { 849 committed[*vit].clear(); 850 } 851 shouldClear.clear(); 852 853 for (std::set<uint16_t>::const_iterator vit = vbid_set.begin(); 854 vit != vbid_set.end(); ++vit) { 855 uint16_t vb(*vit); 856 857 unordered_map<std::string, mutation_log_event_t>::iterator 858 copyit2; 859 for (copyit2 = loading[vb].begin(); 860 copyit2 != loading[vb].end(); 861 ++copyit2) { 862 863 mutation_log_event_t t = copyit2->second; 864 865 switch (t.second) { 866 case ML_NEW: 867 committed[vb][copyit2->first] = t.first; 868 break; 869 case ML_DEL: 870 committed[vb].erase(copyit2->first); 871 break; 872 default: 873 abort(); 874 } 875 } 876 } 877 } 878 loading.clear(); 879 break; 880 case ML_COMMIT1: 881 // nothing in particular 882 break; 883 case ML_DEL_ALL: 884 if (vbid_set.find(le->vbucket()) != vbid_set.end()) { 885 loading[le->vbucket()].clear(); 886 shouldClear.insert(le->vbucket()); 887 } 888 break; 889 default: 890 abort(); 891 } 892 } 893 return clean; 894} 895 896void MutationLogHarvester::apply(void *arg, mlCallback mlc) { 897 for (std::set<uint16_t>::const_iterator it = vbid_set.begin(); 898 it != vbid_set.end(); ++it) { 899 uint16_t vb(*it); 900 901 for (unordered_map<std::string, uint64_t>::iterator it2 = 902 committed[vb].begin(); 903 it2 != committed[vb].end(); ++it2) { 904 const std::string key(it2->first); 905 uint64_t rowid(it2->second); 906 if (!mlc(arg, vb, key, rowid)) { // Stop loading from an access log 907 return; 908 } 909 } 910 } 911} 912 913void MutationLogHarvester::apply(void *arg, mlCallbackWithQueue mlc) { 914 cb_assert(engine); 915 std::vector<std::pair<std::string, uint64_t> > fetches; 916 std::set<uint16_t>::const_iterator it = vbid_set.begin(); 917 for (; it != vbid_set.end(); ++it) { 918 uint16_t vb(*it); 919 RCPtr<VBucket> vbucket = engine->getEpStore()->getVBucket(vb); 920 if (!vbucket) { 921 continue; 922 } 923 unordered_map<std::string, uint64_t>::iterator it2 = 924 committed[vb].begin(); 925 for (; it2 != committed[vb].end(); ++it2) { 926 // cannot use rowid from access log, so must read from hashtable 927 std::string key = it2->first; 928 StoredValue *v = NULL; 929 if ((v = vbucket->ht.find(key, false))) { 930 fetches.push_back(std::make_pair(it2->first, v->getBySeqno())); 931 } 932 } 933 if (!mlc(vb, fetches, arg)) { 934 return; 935 } 936 fetches.clear(); 937 } 938} 939 940void MutationLogHarvester::getUncommitted( 941 std::vector<mutation_log_uncommitted_t> &uitems) { 942 943 for (std::set<uint16_t>::const_iterator vit = vbid_set.begin(); 944 vit != vbid_set.end(); ++vit) { 945 uint16_t vb(*vit); 946 mutation_log_uncommitted_t leftover; 947 leftover.vbucket = vb; 948 949 unordered_map<std::string, mutation_log_event_t>::iterator copyit2; 950 for (copyit2 = loading[vb].begin(); 951 copyit2 != loading[vb].end(); 952 ++copyit2) { 953 954 mutation_log_event_t t = copyit2->second; 955 leftover.key = copyit2->first; 956 leftover.rowid = t.first; 957 leftover.type = static_cast<mutation_log_type_t>(t.second); 958 959 uitems.push_back(leftover); 960 } 961 } 962} 963 964size_t MutationLogHarvester::total() { 965 size_t rv(0); 966 for (int i = 0; i < MUTATION_LOG_TYPES; ++i) { 967 rv += itemsSeen[i]; 968 } 969 return rv; 970} 971 972// ---------------------------------------------------------------------- 973// Output of entries 974// ---------------------------------------------------------------------- 975 976std::ostream& operator <<(std::ostream &out, const MutationLogEntry &mle) { 977 out << "{MutationLogEntry rowid=" << mle.rowid() 978 << ", vbucket=" << mle.vbucket() 979 << ", magic=0x" << std::hex << static_cast<uint16_t>(mle.magic) 980 << std::dec 981 << ", type=" << logType(mle.type()) 982 << ", key=``" << mle.key() << "''"; 983 return out; 984} 985