1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2012 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <iostream>
21 
22 #include "access_scanner.h"
23 #include "ep_engine.h"
24 #include "mutation_log.h"
25 
26 class ItemAccessVisitor : public VBucketVisitor {
27 public:
ItemAccessVisitor(EventuallyPersistentStore &_store, EPStats &_stats, uint16_t sh, bool *sfin, AccessScanner *aS)28     ItemAccessVisitor(EventuallyPersistentStore &_store, EPStats &_stats,
29                       uint16_t sh, bool *sfin, AccessScanner *aS) :
30         store(_store), stats(_stats), startTime(ep_real_time()),
31         shardID(sh), stateFinalizer(sfin), as(aS)
32     {
33         Configuration &conf = store.getEPEngine().getConfiguration();
34         name = conf.getAlogPath();
35         std::stringstream s;
36         s << shardID;
37         name = name + "." + s.str();
38         prev = name + ".old";
39         next = name + ".next";
40 
41         log = new MutationLog(next, conf.getAlogBlockSize());
42         cb_assert(log != NULL);
43         log->open();
44         if (!log->isOpen()) {
45             LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to open access log: %s",
46                 next.c_str());
47             delete log;
48             log = NULL;
49         }
50     }
51 
visit(StoredValue *v)52     void visit(StoredValue *v) {
53         if (log != NULL && v->isResident()) {
54             if (v->isExpired(startTime) || v->isDeleted()) {
55                 LOG(EXTENSION_LOG_INFO,
56                 "INFO: Skipping expired/deleted item: %s",v->getKey().c_str());
57             } else {
58                 accessed.push_back(std::make_pair(v->getBySeqno(), v->getKey()));
59             }
60         }
61     }
62 
update()63     void update() {
64         if (log != NULL) {
65             std::list<std::pair<uint64_t, std::string> >::iterator it;
66             for (it = accessed.begin(); it != accessed.end(); ++it) {
67                 log->newItem(currentBucket->getId(), it->second, it->first);
68             }
69         }
70         accessed.clear();
71     }
72 
visitBucket(RCPtr<VBucket> &vb)73     bool visitBucket(RCPtr<VBucket> &vb) {
74         update();
75 
76         if (log == NULL) {
77             return false;
78         }
79 
80         return VBucketVisitor::visitBucket(vb);
81     }
82 
complete()83     virtual void complete() {
84         update();
85 
86         if (stateFinalizer) {
87             if (++(as->completedCount) == store.getVBuckets().getNumShards()) {
88                 *stateFinalizer = true;
89             }
90         }
91 
92         if (log != NULL) {
93             size_t num_items = log->itemsLogged[ML_NEW];
94             log->commit1();
95             log->commit2();
96             delete log;
97             log = NULL;
98             ++stats.alogRuns;
99             stats.alogRuntime.store(ep_real_time() - startTime);
100             stats.alogNumItems.store(num_items);
101 
102             if (num_items == 0) {
103                 LOG(EXTENSION_LOG_INFO, "The new access log is empty. "
104                     "Delete it without replacing the current access log...\n");
105                 remove(next.c_str());
106                 return;
107             }
108 
109             if (access(prev.c_str(), F_OK) == 0 && remove(prev.c_str()) == -1){
110                 LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
111                     prev.c_str(), strerror(errno));
112                 remove(next.c_str());
113             } else if (access(name.c_str(), F_OK) == 0 && rename(name.c_str(),
114                                                           prev.c_str()) == -1){
115                 LOG(EXTENSION_LOG_WARNING,
116                     "FATAL: Failed to rename '%s' to '%s': %s",
117                     name.c_str(), prev.c_str(), strerror(errno));
118                 remove(next.c_str());
119             } else if (rename(next.c_str(), name.c_str()) == -1) {
120                 LOG(EXTENSION_LOG_WARNING,
121                     "FATAL: Failed to rename '%s' to '%s': %s",
122                     next.c_str(), name.c_str(), strerror(errno));
123                 remove(next.c_str());
124             }
125         }
126     }
127 
128 private:
129     EventuallyPersistentStore &store;
130     EPStats &stats;
131     rel_time_t startTime;
132     std::string prev;
133     std::string next;
134     std::string name;
135     uint16_t shardID;
136 
137     std::list<std::pair<uint64_t, std::string> > accessed;
138 
139     MutationLog *log;
140     bool *stateFinalizer;
141     AccessScanner *as;
142 };
143 
run()144 bool AccessScanner::run() {
145     if (available) {
146         available = false;
147         store.resetAccessScannerTasktime();
148         completedCount = 0;
149         for (size_t i = 0; i < store.getVBuckets().getNumShards(); i++) {
150             shared_ptr<ItemAccessVisitor> pv(new ItemAccessVisitor(store,
151                                              stats, i, &available, this));
152             shared_ptr<VBucketVisitor> vbv(pv);
153             ExTask task = new VBucketVisitorTask(&store, vbv, i,
154                                                  "Item Access Scanner",
155                                                  sleepTime, true);
156             ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
157         }
158     }
159     snooze(sleepTime);
160     stats.alogTime.store(waketime.tv_sec);
161     return true;
162 }
163 
getDescription()164 std::string AccessScanner::getDescription() {
165     return std::string("Generating access log");
166 }
167 
startTime()168 size_t AccessScanner::startTime() {
169     Configuration &cfg = store.getEPEngine().getConfiguration();
170     return cfg.getAlogTaskTime();
171 }
172