1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <cstdlib>
21 #include <iostream>
22 #include <limits>
23 #include <list>
24 #include <string>
25 #include <utility>
26 
27 #include "common.h"
28 #include "ep.h"
29 #include "ep_engine.h"
30 #include "item_pager.h"
31 #include "connmap.h"
32 
33 static const size_t MAX_PERSISTENCE_QUEUE_SIZE = 1000000;
34 
35 /**
36  * As part of the ItemPager, visit all of the objects in memory and
37  * eject some within a constrained probability
38  */
39 class PagingVisitor : public VBucketVisitor {
40 public:
41 
42     /**
43      * Construct a PagingVisitor that will attempt to evict the given
44      * percentage of objects.
45      *
46      * @param s the store that will handle the bulk removal
47      * @param st the stats where we'll track what we've done
48      * @param pcnt percentage of objects to attempt to evict (0-1)
49      * @param sfin pointer to a bool to be set to true after run completes
50      * @param pause flag indicating if PagingVisitor can pause between vbucket
51      *              visits
52      * @param bias active vbuckets eviction probability bias multiplier (0-1)
53      * @param phase pointer to an item_pager_phase to be set
54      */
PagingVisitor(EventuallyPersistentStore &s, EPStats &st, double pcnt, bool *sfin, bool pause = false, double bias = 1, item_pager_phase *phase = NULL)55     PagingVisitor(EventuallyPersistentStore &s, EPStats &st, double pcnt,
56                   bool *sfin, bool pause = false,
57                   double bias = 1, item_pager_phase *phase = NULL)
58       : store(s), stats(st), percent(pcnt),
59         activeBias(bias), ejected(0),
60         startTime(ep_real_time()), stateFinalizer(sfin), canPause(pause),
61         completePhase(true), wasHighMemoryUsage(s.isMemoryUsageTooHigh()),
62         pager_phase(phase) {}
63 
visit(StoredValue *v)64     void visit(StoredValue *v) {
65         // Delete expired items for an active vbucket.
66         bool isExpired = (currentBucket->getState() == vbucket_state_active) &&
67             v->isExpired(startTime) && !v->isDeleted();
68         if (isExpired || v->isTempNonExistentItem() || v->isTempDeletedItem()) {
69             expired.push_back(std::make_pair(currentBucket->getId(),
70                                              v->getKey()));
71             return;
72         }
73 
74         // return if not ItemPager, which uses valid eviction percentage
75         if (percent <= 0 || !pager_phase) {
76             return;
77         }
78 
79         // always evict unreferenced items, or randomly evict referenced item
80         double r = *pager_phase == PAGING_UNREFERENCED ?
81             1 :
82             static_cast<double>(std::rand()) / static_cast<double>(RAND_MAX);
83 
84         if (*pager_phase == PAGING_UNREFERENCED &&
85             v->getNRUValue() == MAX_NRU_VALUE) {
86             doEviction(v);
87         } else if (*pager_phase == PAGING_RANDOM &&
88                    v->incrNRUValue() == MAX_NRU_VALUE &&
89                    r <= percent) {
90             doEviction(v);
91         }
92     }
93 
visitBucket(RCPtr<VBucket> &vb)94     bool visitBucket(RCPtr<VBucket> &vb) {
95         update();
96 
97         bool newCheckpointCreated = false;
98         size_t removed = vb->checkpointManager.removeClosedUnrefCheckpoints(vb,
99                                                          newCheckpointCreated);
100         stats.itemsRemovedFromCheckpoints.fetch_add(removed);
101         // If the new checkpoint is created, notify this event to the
102         // corresponding paused TAP & DCP connections.
103         if (newCheckpointCreated) {
104             store.getEPEngine().getTapConnMap().notifyVBConnections(
105                                                                    vb->getId());
106             store.getEPEngine().getDcpConnMap().notifyVBConnections(
107                                         vb->getId(),
108                                         vb->checkpointManager.getHighSeqno());
109         }
110 
111         // fast path for expiry item pager
112         if (percent <= 0 || !pager_phase) {
113             return VBucketVisitor::visitBucket(vb);
114         }
115 
116         // skip active vbuckets if active resident ratio is lower than replica
117         double current = static_cast<double>(stats.getTotalMemoryUsed());
118         double lower = static_cast<double>(stats.mem_low_wat);
119         double high = static_cast<double>(stats.mem_high_wat);
120         if (vb->getState() == vbucket_state_active && current < high &&
121             store.cachedResidentRatio.activeRatio <
122             store.cachedResidentRatio.replicaRatio)
123         {
124             return false;
125         }
126 
127         if (current > lower) {
128             double p = (current - static_cast<double>(lower)) / current;
129             adjustPercent(p, vb->getState());
130             return VBucketVisitor::visitBucket(vb);
131         } else { // stop eviction whenever memory usage is below low watermark
132             completePhase = false;
133             return false;
134         }
135     }
136 
update()137     void update() {
138         store.deleteExpiredItems(expired);
139 
140         if (numEjected() > 0) {
141             LOG(EXTENSION_LOG_INFO, "Paged out %ld values", numEjected());
142         }
143 
144         size_t num_expired = expired.size();
145         if (num_expired > 0) {
146             LOG(EXTENSION_LOG_INFO, "Purged %ld expired items", num_expired);
147         }
148 
149         ejected = 0;
150         expired.clear();
151     }
152 
pauseVisitor()153     bool pauseVisitor() {
154         size_t queueSize = stats.diskQueueSize.load();
155         return canPause && queueSize >= MAX_PERSISTENCE_QUEUE_SIZE;
156     }
157 
complete()158     void complete() {
159         update();
160         if (stateFinalizer) {
161             *stateFinalizer = true;
162         }
163 
164         if (pager_phase && completePhase) {
165             if (*pager_phase == PAGING_UNREFERENCED) {
166                 *pager_phase = PAGING_RANDOM;
167             } else {
168                 *pager_phase = PAGING_UNREFERENCED;
169             }
170         }
171 
172         // Wake up any sleeping backfill tasks if the memory usage is lowered
173         // below the high watermark as a result of checkpoint removal.
174         if (wasHighMemoryUsage && !store.isMemoryUsageTooHigh()) {
175             store.getEPEngine().getDcpConnMap().notifyBackfillManagerTasks();
176         }
177     }
178 
179     /**
180      * Get the number of items ejected during the visit.
181      */
numEjected()182     size_t numEjected() { return ejected; }
183 
184 private:
adjustPercent(double prob, vbucket_state_t state)185     void adjustPercent(double prob, vbucket_state_t state) {
186         if (state == vbucket_state_replica ||
187             state == vbucket_state_dead)
188         {
189             // replica items should have higher eviction probability
190             double p = prob*(2 - activeBias);
191             percent = p < 0.9 ? p : 0.9;
192         } else {
193             // active items have lower eviction probability
194             percent = prob*activeBias;
195         }
196     }
197 
198     void doEviction(StoredValue *v) {
199         item_eviction_policy_t policy = store.getItemEvictionPolicy();
200         std::string key = v->getKey();
201 
202         if (currentBucket->ht.unlocked_ejectItem(v, policy)) {
203             ++ejected;
204 
205             /**
206              * For FULL EVICTION MODE, add all items that are being
207              * evicted to the corresponding bloomfilter.
208              */
209             if (policy == FULL_EVICTION) {
210                 currentBucket->addToFilter(key);
211             }
212         }
213     }
214 
215     std::list<std::pair<uint16_t, std::string> > expired;
216 
217     EventuallyPersistentStore &store;
218     EPStats &stats;
219     double percent;
220     double activeBias;
221     size_t ejected;
222     time_t startTime;
223     bool *stateFinalizer;
224     bool canPause;
225     bool completePhase;
226     bool wasHighMemoryUsage;
227     item_pager_phase *pager_phase;
228 };
229 
run(void)230 bool ItemPager::run(void) {
231     EventuallyPersistentStore *store = engine->getEpStore();
232     double current = static_cast<double>(stats.getTotalMemoryUsed());
233     double upper = static_cast<double>(stats.mem_high_wat);
234     double lower = static_cast<double>(stats.mem_low_wat);
235     double sleepTime = 5;
236 
237     if (current <= lower) {
238         doEvict = false;
239     }
240 
241     if (available && ((current > upper) || doEvict)) {
242         if (store->getItemEvictionPolicy() == VALUE_ONLY) {
243             doEvict = true;
244         }
245 
246         ++stats.pagerRuns;
247 
248         double toKill = (current - static_cast<double>(lower)) / current;
249 
250         std::stringstream ss;
251         ss << "Using " << stats.getTotalMemoryUsed()
252            << " bytes of memory, paging out %0f%% of items." << std::endl;
253         LOG(EXTENSION_LOG_INFO, ss.str().c_str(), (toKill*100.0));
254 
255         // compute active vbuckets evicition bias factor
256         Configuration &cfg = engine->getConfiguration();
257         size_t activeEvictPerc = cfg.getPagerActiveVbPcnt();
258         double bias = static_cast<double>(activeEvictPerc) / 50;
259 
260         available = false;
261         shared_ptr<PagingVisitor> pv(new PagingVisitor(*store, stats, toKill,
262                                                        &available,
263                                                        false, bias, &phase));
264         store->visit(pv, "Item pager", NONIO_TASK_IDX,
265                     Priority::ItemPagerPriority);
266     }
267 
268     snooze(sleepTime);
269     return true;
270 }
271 
run(void)272 bool ExpiredItemPager::run(void) {
273     EventuallyPersistentStore *store = engine->getEpStore();
274     if (available) {
275         ++stats.expiryPagerRuns;
276 
277         available = false;
278         shared_ptr<PagingVisitor> pv(new PagingVisitor(*store, stats, -1,
279                                                        &available,
280                                                        true, 1, NULL));
281         // track spawned tasks for shutdown..
282         store->visit(pv, "Expired item remover", NONIO_TASK_IDX,
283                 Priority::ItemPagerPriority, 10);
284     }
285     snooze(sleepTime);
286     return true;
287 }
288