1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2012 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include <iostream>
21
22 #include "access_scanner.h"
23 #include "ep_engine.h"
24 #include "mutation_log.h"
25
26 class ItemAccessVisitor : public VBucketVisitor {
27 public:
ItemAccessVisitor(EventuallyPersistentStore &_store, EPStats &_stats, uint16_t sh, bool *sfin, AccessScanner *aS)28 ItemAccessVisitor(EventuallyPersistentStore &_store, EPStats &_stats,
29 uint16_t sh, bool *sfin, AccessScanner *aS) :
30 store(_store), stats(_stats), startTime(ep_real_time()),
31 shardID(sh), stateFinalizer(sfin), as(aS)
32 {
33 Configuration &conf = store.getEPEngine().getConfiguration();
34 name = conf.getAlogPath();
35 std::stringstream s;
36 s << shardID;
37 name = name + "." + s.str();
38 prev = name + ".old";
39 next = name + ".next";
40
41 log = new MutationLog(next, conf.getAlogBlockSize());
42 cb_assert(log != NULL);
43 log->open();
44 if (!log->isOpen()) {
45 LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to open access log: %s",
46 next.c_str());
47 delete log;
48 log = NULL;
49 }
50 }
51
visit(StoredValue *v)52 void visit(StoredValue *v) {
53 if (log != NULL && v->isResident()) {
54 if (v->isExpired(startTime) || v->isDeleted()) {
55 LOG(EXTENSION_LOG_INFO,
56 "INFO: Skipping expired/deleted item: %s",v->getKey().c_str());
57 } else {
58 accessed.push_back(std::make_pair(v->getBySeqno(), v->getKey()));
59 }
60 }
61 }
62
update()63 void update() {
64 if (log != NULL) {
65 std::list<std::pair<uint64_t, std::string> >::iterator it;
66 for (it = accessed.begin(); it != accessed.end(); ++it) {
67 log->newItem(currentBucket->getId(), it->second, it->first);
68 }
69 }
70 accessed.clear();
71 }
72
visitBucket(RCPtr<VBucket> &vb)73 bool visitBucket(RCPtr<VBucket> &vb) {
74 update();
75
76 if (log == NULL) {
77 return false;
78 }
79
80 return VBucketVisitor::visitBucket(vb);
81 }
82
complete()83 virtual void complete() {
84 update();
85
86 if (stateFinalizer) {
87 if (++(as->completedCount) == store.getVBuckets().getNumShards()) {
88 *stateFinalizer = true;
89 }
90 }
91
92 if (log != NULL) {
93 size_t num_items = log->itemsLogged[ML_NEW];
94 log->commit1();
95 log->commit2();
96 delete log;
97 log = NULL;
98 ++stats.alogRuns;
99 stats.alogRuntime.store(ep_real_time() - startTime);
100 stats.alogNumItems.store(num_items);
101
102 if (num_items == 0) {
103 LOG(EXTENSION_LOG_INFO, "The new access log is empty. "
104 "Delete it without replacing the current access log...\n");
105 remove(next.c_str());
106 return;
107 }
108
109 if (access(prev.c_str(), F_OK) == 0 && remove(prev.c_str()) == -1){
110 LOG(EXTENSION_LOG_WARNING, "FATAL: Failed to remove '%s': %s",
111 prev.c_str(), strerror(errno));
112 remove(next.c_str());
113 } else if (access(name.c_str(), F_OK) == 0 && rename(name.c_str(),
114 prev.c_str()) == -1){
115 LOG(EXTENSION_LOG_WARNING,
116 "FATAL: Failed to rename '%s' to '%s': %s",
117 name.c_str(), prev.c_str(), strerror(errno));
118 remove(next.c_str());
119 } else if (rename(next.c_str(), name.c_str()) == -1) {
120 LOG(EXTENSION_LOG_WARNING,
121 "FATAL: Failed to rename '%s' to '%s': %s",
122 next.c_str(), name.c_str(), strerror(errno));
123 remove(next.c_str());
124 }
125 }
126 }
127
128 private:
129 EventuallyPersistentStore &store;
130 EPStats &stats;
131 rel_time_t startTime;
132 std::string prev;
133 std::string next;
134 std::string name;
135 uint16_t shardID;
136
137 std::list<std::pair<uint64_t, std::string> > accessed;
138
139 MutationLog *log;
140 bool *stateFinalizer;
141 AccessScanner *as;
142 };
143
run()144 bool AccessScanner::run() {
145 if (available) {
146 available = false;
147 store.resetAccessScannerTasktime();
148 completedCount = 0;
149 for (size_t i = 0; i < store.getVBuckets().getNumShards(); i++) {
150 shared_ptr<ItemAccessVisitor> pv(new ItemAccessVisitor(store,
151 stats, i, &available, this));
152 shared_ptr<VBucketVisitor> vbv(pv);
153 ExTask task = new VBucketVisitorTask(&store, vbv, i,
154 "Item Access Scanner",
155 sleepTime, true);
156 ExecutorPool::get()->schedule(task, AUXIO_TASK_IDX);
157 }
158 }
159 snooze(sleepTime);
160 stats.alogTime.store(waketime.tv_sec);
161 return true;
162 }
163
getDescription()164 std::string AccessScanner::getDescription() {
165 return std::string("Generating access log");
166 }
167
startTime()168 size_t AccessScanner::startTime() {
169 Configuration &cfg = store.getEPEngine().getConfiguration();
170 return cfg.getAlogTaskTime();
171 }
172