1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #pragma once
19 
20 /**
21  * 'Mutation' Log
22  *
23  * The MutationLog is used to maintain a log of mutations which have occurred
24  * in one or more vbuckets. It only records the additions or removals of keys,
25  * and then only the key of the item (no value).
26  *
27  * The original intent of this class was to record a log in parallel with the
28  * normal couchstore snapshots, see docs/klog.org, however this has not been
29  * used since MB-7590 (March 2013).
30  *
31  * The current use of MutationLog is for the access.log. This is a slightly
32  * different use-case - periodically (default daily) the AccessScanner walks
33  * each vBucket's HashTable and records the set of keys currently resident.
34  * This doesn't make use of the MutationLog's commit functionality - its simply
35  * a list of keys which were resident. When we later come to read the Access log
36  * during warmup there's no guarantee that the keys listed still exist - the
37  * contents of the Access log is essentially just a hint / suggestion.
38  *
39  */
40 
41 #include "config.h"
42 
43 #include "mutation_log_entry.h"
44 
45 #include <array>
46 #include <cstring>
47 #include <memory>
48 #include <set>
49 #include <string>
50 #include <unordered_map>
51 #include <vector>
52 
53 #include <atomic>
54 #include <platform/histogram.h>
55 #include "utility.h"
56 
57 #define ML_BUFLEN (128 * 1024 * 1024)
58 
59 #ifdef WIN32
60 typedef HANDLE file_handle_t;
61 #define INVALID_FILE_VALUE INVALID_HANDLE_VALUE
62 #else
63 typedef int file_handle_t;
64 #define INVALID_FILE_VALUE -1
65 #endif
66 
67 
68 const size_t MAX_LOG_SIZE((size_t)(unsigned int)-1);
69 const size_t MAX_ENTRY_RATIO(10);
70 const size_t LOG_COMPACTOR_QUEUE_CAP(500000);
71 const int MUTATION_LOG_COMPACTOR_FREQ(3600);
72 
73 const size_t MIN_LOG_HEADER_SIZE(4096);
74 const size_t HEADER_RESERVED(4);
75 
76 enum class MutationLogVersion { V1 = 1, V2 = 2, Current = V2 };
77 
78 const size_t LOG_ENTRY_BUF_SIZE(512);
79 
80 const uint8_t SYNC_COMMIT_1(1);
81 const uint8_t SYNC_COMMIT_2(2);
82 const uint8_t SYNC_FULL(SYNC_COMMIT_1 | SYNC_COMMIT_2);
83 const uint8_t FLUSH_COMMIT_1(4);
84 const uint8_t FLUSH_COMMIT_2(8);
85 const uint8_t FLUSH_FULL(FLUSH_COMMIT_1 | FLUSH_COMMIT_2);
86 
87 const uint8_t DEFAULT_SYNC_CONF(FLUSH_COMMIT_2 | SYNC_COMMIT_2);
88 
89 /**
90  * The header block representing the first 4k (or so) of a MutationLog
91  * file.
92  */
93 class LogHeaderBlock {
94 public:
LogHeaderBlock(MutationLogVersion version = MutationLogVersion::Current)95     LogHeaderBlock(MutationLogVersion version = MutationLogVersion::Current)
96         : _version(htonl(int(version))),
97           _blockSize(0),
98           _blockCount(0),
99           _rdwr(1) {
100     }
101 
set(uint32_t bs, uint32_t bc=1)102     void set(uint32_t bs, uint32_t bc=1) {
103         _blockSize = htonl(bs);
104         _blockCount = htonl(bc);
105     }
106 
set(const std::array<uint8_t, MIN_LOG_HEADER_SIZE>& buf)107     void set(const std::array<uint8_t, MIN_LOG_HEADER_SIZE>& buf) {
108         int offset(0);
109         memcpy(&_version, buf.data() + offset, sizeof(_version));
110         offset += sizeof(_version);
111         memcpy(&_blockSize, buf.data() + offset, sizeof(_blockSize));
112         offset += sizeof(_blockSize);
113         memcpy(&_blockCount, buf.data() + offset, sizeof(_blockCount));
114         offset += sizeof(_blockCount);
115         memcpy(&_rdwr, buf.data() + offset, sizeof(_rdwr));
116     }
117 
version() const118     MutationLogVersion version() const {
119         return MutationLogVersion(ntohl(_version));
120     }
121 
blockSize() const122     uint32_t blockSize() const {
123         return ntohl(_blockSize);
124     }
125 
blockCount() const126     uint32_t blockCount() const {
127         return ntohl(_blockCount);
128     }
129 
rdwr() const130     uint32_t rdwr() const {
131         return ntohl(_rdwr);
132     }
133 
setRdwr(uint32_t nval)134     void setRdwr(uint32_t nval) {
135         _rdwr = htonl(nval);
136     }
137 
138 private:
139 
140     uint32_t _version;
141     uint32_t _blockSize;
142     uint32_t _blockCount;
143     uint32_t _rdwr;
144 };
145 
146 /**
147  * Mutation log compactor config that is used to control the scheduling of
148  * the log compactor
149  */
150 class MutationLogCompactorConfig {
151 public:
MutationLogCompactorConfig()152     MutationLogCompactorConfig() :
153         maxLogSize(MAX_LOG_SIZE), maxEntryRatio(MAX_ENTRY_RATIO),
154         queueCap(LOG_COMPACTOR_QUEUE_CAP),
155         sleepTime(MUTATION_LOG_COMPACTOR_FREQ) { }
156 
MutationLogCompactorConfig(size_t max_log_size, size_t max_entry_ratio, size_t queue_cap, size_t stime)157     MutationLogCompactorConfig(size_t max_log_size,
158                                size_t max_entry_ratio,
159                                size_t queue_cap,
160                                size_t stime) :
161         maxLogSize(max_log_size), maxEntryRatio(max_entry_ratio),
162         queueCap(queue_cap), sleepTime(stime) { }
163 
setMaxLogSize(size_t max_log_size)164     void setMaxLogSize(size_t max_log_size) {
165         maxLogSize = max_log_size;
166     }
167 
getMaxLogSize() const168     size_t getMaxLogSize() const {
169         return maxLogSize;
170     }
171 
setMaxEntryRatio(size_t max_entry_ratio)172     void setMaxEntryRatio(size_t max_entry_ratio) {
173         maxEntryRatio = max_entry_ratio;
174     }
175 
getMaxEntryRatio() const176     size_t getMaxEntryRatio() const {
177         return maxEntryRatio;
178     }
179 
setQueueCap(size_t queue_cap)180     void setQueueCap(size_t queue_cap) {
181         queueCap = queue_cap;
182     }
183 
getQueueCap() const184     size_t getQueueCap() const {
185         return queueCap;
186     }
187 
setSleepTime(size_t stime)188     void setSleepTime(size_t stime) {
189         sleepTime = stime;
190     }
191 
getSleepTime() const192     size_t getSleepTime() const {
193         return sleepTime;
194     }
195 
196 private:
197     size_t maxLogSize;
198     size_t maxEntryRatio;
199     size_t queueCap;
200     size_t sleepTime;
201 };
202 
203 /**
204  * The MutationLog records major key events to allow ep-engine to more
205  * quickly restore the server to its previous state upon restart.
206  */
207 class MutationLog {
208 public:
209     MutationLog(const std::string& path, const size_t bs = MIN_LOG_HEADER_SIZE);
210 
211     ~MutationLog();
212 
213     void newItem(uint16_t vbucket, const DocKey& key);
214 
215     void commit1();
216 
217     void commit2();
218 
219     bool flush();
220 
221     void sync();
222 
223     void disable();
224 
isEnabled() const225     bool isEnabled() const {
226         return !disabled;
227     }
228 
isOpen() const229     bool isOpen() const {
230         return file != INVALID_FILE_VALUE;
231     }
232 
header() const233     LogHeaderBlock header() const {
234         return headerBlock;
235     }
236 
setSyncConfig(uint8_t sconf)237     void setSyncConfig(uint8_t sconf) {
238         syncConfig = sconf;
239     }
240 
getSyncConfig() const241     uint8_t getSyncConfig() const {
242         return syncConfig & SYNC_FULL;
243     }
244 
getFlushConfig() const245     uint8_t getFlushConfig() const {
246         return syncConfig & FLUSH_FULL;
247     }
248 
getBlockSize() const249     size_t getBlockSize() const {
250         return blockSize;
251     }
252 
253     bool exists() const;
254 
getLogFile() const255     const std::string &getLogFile() const { return logPath; }
256 
257     /**
258      * Open and initialize the log.
259      *
260      * This typically happens automatically.
261      */
262     void open(bool _readOnly = false);
263 
264     /**
265      * Close the log file.
266      */
267     void close();
268 
269     /**
270      * Reset the log.
271      */
272     bool reset();
273 
274     /**
275      * Replace the current log with a given log.
276      */
277     bool replaceWith(MutationLog &mlog);
278 
279     bool setSyncConfig(const std::string &s);
280     bool setFlushConfig(const std::string &s);
281 
282     /**
283      * Reset the item type counts to the given values.
284      *
285      * This is used by the loader as part of initialization.
286      */
287     void resetCounts(size_t *);
288 
289     /**
290      * Exception thrown upon failure to write a mutation log.
291      */
292     class WriteException : public std::runtime_error {
293     public:
WriteException(const std::string &s)294         WriteException(const std::string &s) : std::runtime_error(s) {}
295     };
296 
297     /**
298      * Exception thrown upon failure to read a mutation log.
299      */
300     class ReadException : public std::runtime_error {
301     public:
ReadException(const std::string &s)302         ReadException(const std::string &s) : std::runtime_error(s) {}
303     };
304 
305     class FileNotFoundException : public ReadException {
306     public:
FileNotFoundException(const std::string &s)307         FileNotFoundException(const std::string &s) : ReadException(s) {}
308     };
309 
310     /**
311      * Exception thrown when a CRC mismatch is read in the log.
312      */
313     class CRCReadException : public ReadException {
314     public:
CRCReadException()315         CRCReadException() : ReadException("CRC Mismatch") {}
316     };
317 
318     /**
319      * Exception thrown when a short read occurred.
320      */
321     class ShortReadException : public ReadException {
322     public:
ShortReadException()323         ShortReadException() : ReadException("Short Read") {}
324     };
325 
326     /**
327      * The MutationLog::iterator will return MutationLogEntryHolder objects
328      * which handle resource destruction if necessary. In some cases the entry
329      * being read is a temporary heap allocation which will need deleting.
330      * Sometimes the entry is owned by the iterator and the iterator will
331      * sort the deletion.
332      */
333     class MutationLogEntryHolder {
334     public:
335         /**
336          * @param _mle A pointer to a buffer which contains a MutationLogEntry
337          * @param _destroy Set to true if the _mle buffer must be deleted once
338          *        the holder's life is complete.
339          */
MutationLogEntryHolder(const uint8_t* _mle, bool _destroy)340         MutationLogEntryHolder(const uint8_t* _mle, bool _destroy)
341             : mle(_mle), destroy(_destroy) {
342         }
343 
MutationLogEntryHolder(MutationLogEntryHolder&& rhs)344         MutationLogEntryHolder(MutationLogEntryHolder&& rhs)
345             : mle(rhs.mle), destroy(rhs.destroy) {
346             rhs.mle = nullptr;
347         }
348 
349         MutationLogEntryHolder(const MutationLogEntryHolder& rhs) = delete;
350 
351         /**
352          * Destructor will delete the mle data only if we're told to by the
353          * constructing code
354          */
~MutationLogEntryHolder()355         ~MutationLogEntryHolder() {
356             if (destroy) {
357                 delete[] mle;
358             }
359         }
360 
operator ->() const361         const MutationLogEntry* operator->() const {
362             return reinterpret_cast<const MutationLogEntry*>(mle);
363         }
364 
365     private:
366         const uint8_t* mle;
367         bool destroy;
368     };
369 
370     /**
371      * An iterator for the mutation log.
372      *
373      * A ReadException may be thrown at any point along iteration.
374      */
375     class iterator  : public std::iterator<std::input_iterator_tag,
376                                            const MutationLogEntry> {
377     public:
378 
379         iterator(const iterator& mit);
380 
381         iterator& operator=(const iterator& other);
382 
383         ~iterator();
384 
385         iterator& operator++();
386 
387         bool operator==(const iterator& rhs) const;
388 
389         bool operator!=(const iterator& rhs) const;
390 
391         MutationLogEntryHolder operator*();
392 
393     private:
394 
395         friend class MutationLog;
396 
397         iterator(const MutationLog* l, bool e=false);
398 
399         /// @returns the length of the entry the iterator is currently at
400         size_t getCurrentEntryLen() const;
401         void nextBlock();
402         size_t bufferBytesRemaining();
403         void prepItem();
404 
405         /**
406          * Upgrades the entry the iterator is currently at and returns it
407          * via a MutationLogEntryHolder
408          */
409         MutationLogEntryHolder upgradeEntry() const;
410 
411         const MutationLog* log;
412         std::vector<uint8_t> entryBuf;
413         std::vector<uint8_t> buf;
414         std::vector<uint8_t>::const_iterator p;
415         off_t              offset;
416         uint16_t           items;
417         bool               isEnd;
418     };
419 
420     /**
421      * An iterator pointing to the beginning of the log file.
422      */
begin()423     iterator begin() {
424         iterator it(iterator(this));
425         it.nextBlock();
426         return it;
427     }
428 
429     /**
430      * An iterator pointing at the end of the log file.
431      */
end()432     iterator end() {
433         return iterator(this, true);
434     }
435 
436     //! Items logged by type.
437     std::atomic<size_t> itemsLogged[int(MutationLogType::NumberOfTypes)];
438     //! Histogram of block padding sizes.
439     Histogram<uint32_t> paddingHisto;
440     //! Flush time histogram.
441     MicrosecondHistogram flushTimeHisto;
442     //! Sync time histogram.
443     MicrosecondHistogram syncTimeHisto;
444     //! Size of the log
445     std::atomic<size_t> logSize;
446 
447 private:
needWriteAccess(void)448     void needWriteAccess(void) {
449         if (readOnly) {
450             throw WriteException("Invalid access (file opened read only)");
451         }
452     }
453     void writeEntry(MutationLogEntry *mle);
454 
455     bool writeInitialBlock();
456     void readInitialBlock();
457     void updateInitialBlock(void);
458 
459     bool prepareWrites();
460 
fd() const461     file_handle_t fd() const { return file; }
462 
463     LogHeaderBlock     headerBlock;
464     const std::string  logPath;
465     size_t             blockSize;
466     size_t             blockPos;
467     file_handle_t      file;
468     bool               disabled;
469     uint16_t           entries;
470     std::unique_ptr<uint8_t[]> entryBuffer;
471     std::unique_ptr<uint8_t[]> blockBuffer;
472     uint8_t            syncConfig;
473     bool               readOnly;
474 
475     friend std::ostream& operator<<(std::ostream& os, const MutationLog& mlog);
476 
477     DISALLOW_COPY_AND_ASSIGN(MutationLog);
478 };
479 
480 std::ostream& operator<<(std::ostream& os, const MutationLog& mlog);
481 
482 /// @cond DETAILS
483 
484 //! rowid, (uint8_t)mutation_log_type_t
485 typedef std::pair<uint64_t, uint8_t> mutation_log_event_t;
486 
487 /// @endcond
488 
489 /**
490  * MutationLogHarvester::apply callback type.
491  */
492 typedef bool (*mlCallback)(void*, uint16_t, const DocKey&);
493 typedef bool (*mlCallbackWithQueue)(uint16_t,
494                                     const std::set<StoredDocKey>&,
495                                     void *arg);
496 
497 /**
498  * Type for mutation log leftovers.
499  */
500 struct mutation_log_uncommitted_t {
501     StoredDocKey        key;
502     uint64_t            rowid;
503     MutationLogType     type;
504     uint16_t            vbucket;
505 };
506 
507 class EventuallyPersistentEngine;
508 
509 /**
510  * Read log entries back from the log to reconstruct the state.
511  */
512 class MutationLogHarvester {
513 public:
MutationLogHarvester(MutationLog &ml, EventuallyPersistentEngine *e = NULL)514     MutationLogHarvester(MutationLog &ml, EventuallyPersistentEngine *e = NULL) :
515         mlog(ml), engine(e)
516     {
517         memset(itemsSeen, 0, sizeof(itemsSeen));
518     }
519 
520     /**
521      * Set a vbucket before loading.
522      */
setVBucket(uint16_t vb)523     void setVBucket(uint16_t vb) {
524         vbid_set.insert(vb);
525     }
526 
527     /**
528      * Load the entries from the file.
529      *
530      * @return true if the file was clean and can likely be trusted.
531      */
532     bool load();
533 
534     /**
535      * Load a batch of entries from the file, starting from the given iterator.
536      * Loaded entries are inserted into `committed`, which is cleared at the
537      * start of each call.
538      *
539      * @param start Iterator of where to start loading from.
540      * @param limit Limit of now many entries should be loaded. Zero means no
541      *              limit.
542      * @return iterator of where to resume in the log (if the end was not
543      *         reached), or MutationLog::iterator::end().
544      */
545     MutationLog::iterator loadBatch(const MutationLog::iterator& start,
546                                         size_t limit);
547 
548     /**
549      * Apply the processed log entries through the given function.
550      */
551     void apply(void *arg, mlCallback mlc);
552     void apply(void *arg, mlCallbackWithQueue mlc);
553 
554     /**
555      * Get the total number of entries found in the log.
556      */
557     size_t total();
558 
559     /**
560      * Get all of the counts of log entries by type.
561      */
getItemsSeen()562     size_t *getItemsSeen() {
563         return itemsSeen;
564     }
565 
566 private:
567 
568     MutationLog &mlog;
569     EventuallyPersistentEngine *engine;
570     std::set<uint16_t> vbid_set;
571 
572     std::unordered_map<uint16_t, std::set<StoredDocKey>> committed;
573     std::unordered_map<uint16_t, std::set<StoredDocKey>> loading;
574     size_t itemsSeen[int(MutationLogType::NumberOfTypes)];
575 };
576