1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2010 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include <cstdlib>
21 #include <iostream>
22 #include <limits>
23 #include <list>
24 #include <string>
25 #include <utility>
26
27 #include "common.h"
28 #include "ep.h"
29 #include "ep_engine.h"
30 #include "item_pager.h"
31 #include "connmap.h"
32
33 static const size_t MAX_PERSISTENCE_QUEUE_SIZE = 1000000;
34
35 /**
36 * As part of the ItemPager, visit all of the objects in memory and
37 * eject some within a constrained probability
38 */
39 class PagingVisitor : public VBucketVisitor {
40 public:
41
42 /**
43 * Construct a PagingVisitor that will attempt to evict the given
44 * percentage of objects.
45 *
46 * @param s the store that will handle the bulk removal
47 * @param st the stats where we'll track what we've done
48 * @param pcnt percentage of objects to attempt to evict (0-1)
49 * @param sfin pointer to a bool to be set to true after run completes
50 * @param pause flag indicating if PagingVisitor can pause between vbucket
51 * visits
52 * @param bias active vbuckets eviction probability bias multiplier (0-1)
53 * @param phase pointer to an item_pager_phase to be set
54 */
PagingVisitor(EventuallyPersistentStore &s, EPStats &st, double pcnt, bool *sfin, bool pause = false, double bias = 1, item_pager_phase *phase = NULL)55 PagingVisitor(EventuallyPersistentStore &s, EPStats &st, double pcnt,
56 bool *sfin, bool pause = false,
57 double bias = 1, item_pager_phase *phase = NULL)
58 : store(s), stats(st), percent(pcnt),
59 activeBias(bias), ejected(0), totalEjected(0),
60 totalEjectionAttempts(0),
61 startTime(ep_real_time()), stateFinalizer(sfin), canPause(pause),
62 completePhase(true), pager_phase(phase) {}
63
visit(StoredValue *v)64 void visit(StoredValue *v) {
65 // Delete expired items for an active vbucket.
66 bool isExpired = (currentBucket->getState() == vbucket_state_active) &&
67 v->isExpired(startTime) && !v->isDeleted();
68 if (isExpired || v->isTempNonExistentItem() || v->isTempDeletedItem()) {
69 expired.push_back(std::make_pair(currentBucket->getId(),
70 v->getKey()));
71 return;
72 }
73
74 // return if not ItemPager, which uses valid eviction percentage
75 if (percent <= 0 || !pager_phase) {
76 return;
77 }
78
79 // always evict unreferenced items, or randomly evict referenced item
80 double r = *pager_phase == PAGING_UNREFERENCED ?
81 1 :
82 static_cast<double>(std::rand()) / static_cast<double>(RAND_MAX);
83
84 if (*pager_phase == PAGING_UNREFERENCED &&
85 v->getNRUValue() == MAX_NRU_VALUE) {
86 doEviction(v);
87 } else if (*pager_phase == PAGING_RANDOM &&
88 v->incrNRUValue() == MAX_NRU_VALUE &&
89 r <= percent) {
90 doEviction(v);
91 }
92 }
93
visitBucket(RCPtr<VBucket> &vb)94 bool visitBucket(RCPtr<VBucket> &vb) {
95 update();
96
97 bool newCheckpointCreated = false;
98 size_t removed = vb->checkpointManager.removeClosedUnrefCheckpoints(vb,
99 newCheckpointCreated);
100 stats.itemsRemovedFromCheckpoints.fetch_add(removed);
101 // If the new checkpoint is created, notify this event to the
102 // corresponding paused TAP & DCP connections.
103 if (newCheckpointCreated) {
104 store.getEPEngine().getTapConnMap().notifyVBConnections(
105 vb->getId());
106 store.getEPEngine().getDcpConnMap().notifyVBConnections(
107 vb->getId(),
108 vb->checkpointManager.getHighSeqno());
109 }
110
111 // fast path for expiry item pager
112 if (percent <= 0 || !pager_phase) {
113 return VBucketVisitor::visitBucket(vb);
114 }
115
116 // skip active vbuckets if active resident ratio is lower than replica
117 double current = static_cast<double>(stats.getTotalMemoryUsed());
118 double lower = static_cast<double>(stats.mem_low_wat);
119 double high = static_cast<double>(stats.mem_high_wat);
120 if (vb->getState() == vbucket_state_active && current < high &&
121 store.cachedResidentRatio.activeRatio <
122 store.cachedResidentRatio.replicaRatio)
123 {
124 return false;
125 }
126
127 if (current > lower) {
128 double p = (current - static_cast<double>(lower)) / current;
129 adjustPercent(p, vb->getState());
130 return VBucketVisitor::visitBucket(vb);
131 } else { // stop eviction whenever memory usage is below low watermark
132 completePhase = false;
133 return false;
134 }
135 }
136
update()137 void update() {
138 store.deleteExpiredItems(expired);
139
140 if (numEjected() > 0) {
141 LOG(EXTENSION_LOG_INFO, "Paged out %ld values", numEjected());
142 }
143
144 size_t num_expired = expired.size();
145 if (num_expired > 0) {
146 LOG(EXTENSION_LOG_INFO, "Purged %ld expired items", num_expired);
147 }
148
149 totalEjected += (ejected + num_expired);
150 ejected = 0;
151 expired.clear();
152 }
153
pauseVisitor()154 bool pauseVisitor() {
155 size_t queueSize = stats.diskQueueSize.load();
156 return canPause && queueSize >= MAX_PERSISTENCE_QUEUE_SIZE;
157 }
158
complete()159 void complete() {
160 update();
161 if (stateFinalizer) {
162 *stateFinalizer = true;
163 }
164
165 if (pager_phase && completePhase) {
166 if (*pager_phase == PAGING_UNREFERENCED) {
167 *pager_phase = PAGING_RANDOM;
168 } else {
169 *pager_phase = PAGING_UNREFERENCED;
170 }
171 }
172 }
173
174 /**
175 * Get the number of items ejected during the visit.
176 */
numEjected()177 size_t numEjected() { return ejected; }
178
179 /**
180 * Get the total number of items whose values are ejected or removed due to
181 * the expiry time.
182 */
getTotalEjected()183 size_t getTotalEjected() { return totalEjected; }
184
185 /**
186 * Get the total number of ejection attempts.
187 */
getTotalEjectionAttempts()188 size_t getTotalEjectionAttempts() { return totalEjectionAttempts; }
189
190 private:
adjustPercent(double prob, vbucket_state_t state)191 void adjustPercent(double prob, vbucket_state_t state) {
192 if (state == vbucket_state_replica ||
193 state == vbucket_state_dead)
194 {
195 // replica items should have higher eviction probability
196 double p = prob*(2 - activeBias);
197 percent = p < 0.9 ? p : 0.9;
198 } else {
199 // active items have lower eviction probability
200 percent = prob*activeBias;
201 }
202 }
203
204 void doEviction(StoredValue *v) {
205 ++totalEjectionAttempts;
206 item_eviction_policy_t policy = store.getItemEvictionPolicy();
207 if (currentBucket->ht.unlocked_ejectItem(v, policy)) {
208 ++ejected;
209 }
210 }
211
212 std::list<std::pair<uint16_t, std::string> > expired;
213
214 EventuallyPersistentStore &store;
215 EPStats &stats;
216 double percent;
217 double activeBias;
218 size_t ejected;
219 size_t totalEjected;
220 size_t totalEjectionAttempts;
221 time_t startTime;
222 bool *stateFinalizer;
223 bool canPause;
224 bool completePhase;
225 item_pager_phase *pager_phase;
226 };
227
run(void)228 bool ItemPager::run(void) {
229 EventuallyPersistentStore *store = engine->getEpStore();
230 double current = static_cast<double>(stats.getTotalMemoryUsed());
231 double upper = static_cast<double>(stats.mem_high_wat);
232 double lower = static_cast<double>(stats.mem_low_wat);
233 double sleepTime = 5;
234
235 if (current <= lower) {
236 doEvict = false;
237 }
238
239 if (available && ((current > upper) || doEvict)) {
240 if (store->getItemEvictionPolicy() == VALUE_ONLY) {
241 doEvict = true;
242 }
243
244 ++stats.pagerRuns;
245
246 double toKill = (current - static_cast<double>(lower)) / current;
247
248 std::stringstream ss;
249 ss << "Using " << stats.getTotalMemoryUsed()
250 << " bytes of memory, paging out %0f%% of items." << std::endl;
251 LOG(EXTENSION_LOG_INFO, ss.str().c_str(), (toKill*100.0));
252
253 // compute active vbuckets evicition bias factor
254 Configuration &cfg = engine->getConfiguration();
255 size_t activeEvictPerc = cfg.getPagerActiveVbPcnt();
256 double bias = static_cast<double>(activeEvictPerc) / 50;
257
258 available = false;
259 shared_ptr<PagingVisitor> pv(new PagingVisitor(*store, stats, toKill,
260 &available,
261 false, bias, &phase));
262 store->visit(pv, "Item pager", NONIO_TASK_IDX,
263 Priority::ItemPagerPriority);
264 }
265
266 snooze(sleepTime);
267 return true;
268 }
269
run(void)270 bool ExpiredItemPager::run(void) {
271 EventuallyPersistentStore *store = engine->getEpStore();
272 if (available) {
273 ++stats.expiryPagerRuns;
274
275 available = false;
276 shared_ptr<PagingVisitor> pv(new PagingVisitor(*store, stats, -1,
277 &available,
278 true, 1, NULL));
279 // track spawned tasks for shutdown..
280 store->visit(pv, "Expired item remover", NONIO_TASK_IDX,
281 Priority::ItemPagerPriority, 10);
282 }
283 snooze(sleepTime);
284 return true;
285 }
286