1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <cstdlib>
21 #include <iostream>
22 #include <limits>
23 #include <list>
24 #include <string>
25 #include <utility>
26 
27 #include "common.h"
28 #include "ep.h"
29 #include "ep_engine.h"
30 #include "item_pager.h"
31 #include "connmap.h"
32 
33 static const size_t MAX_PERSISTENCE_QUEUE_SIZE = 1000000;
34 
35 /**
36  * As part of the ItemPager, visit all of the objects in memory and
37  * eject some within a constrained probability
38  */
39 class PagingVisitor : public VBucketVisitor {
40 public:
41 
42     /**
43      * Construct a PagingVisitor that will attempt to evict the given
44      * percentage of objects.
45      *
46      * @param s the store that will handle the bulk removal
47      * @param st the stats where we'll track what we've done
48      * @param pcnt percentage of objects to attempt to evict (0-1)
49      * @param sfin pointer to a bool to be set to true after run completes
50      * @param pause flag indicating if PagingVisitor can pause between vbucket
51      *              visits
52      * @param bias active vbuckets eviction probability bias multiplier (0-1)
53      * @param phase pointer to an item_pager_phase to be set
54      */
PagingVisitor(EventuallyPersistentStore &s, EPStats &st, double pcnt, bool *sfin, bool pause = false, double bias = 1, item_pager_phase *phase = NULL)55     PagingVisitor(EventuallyPersistentStore &s, EPStats &st, double pcnt,
56                   bool *sfin, bool pause = false,
57                   double bias = 1, item_pager_phase *phase = NULL)
58       : store(s), stats(st), percent(pcnt),
59         activeBias(bias), ejected(0), totalEjected(0),
60         totalEjectionAttempts(0),
61         startTime(ep_real_time()), stateFinalizer(sfin), canPause(pause),
62         completePhase(true), pager_phase(phase) {}
63 
visit(StoredValue *v)64     void visit(StoredValue *v) {
65         // Delete expired items for an active vbucket.
66         bool isExpired = (currentBucket->getState() == vbucket_state_active) &&
67             v->isExpired(startTime) && !v->isDeleted();
68         if (isExpired || v->isTempNonExistentItem() || v->isTempDeletedItem()) {
69             expired.push_back(std::make_pair(currentBucket->getId(),
70                                              v->getKey()));
71             return;
72         }
73 
74         // return if not ItemPager, which uses valid eviction percentage
75         if (percent <= 0 || !pager_phase) {
76             return;
77         }
78 
79         // always evict unreferenced items, or randomly evict referenced item
80         double r = *pager_phase == PAGING_UNREFERENCED ?
81             1 :
82             static_cast<double>(std::rand()) / static_cast<double>(RAND_MAX);
83 
84         if (*pager_phase == PAGING_UNREFERENCED &&
85             v->getNRUValue() == MAX_NRU_VALUE) {
86             doEviction(v);
87         } else if (*pager_phase == PAGING_RANDOM &&
88                    v->incrNRUValue() == MAX_NRU_VALUE &&
89                    r <= percent) {
90             doEviction(v);
91         }
92     }
93 
visitBucket(RCPtr<VBucket> &vb)94     bool visitBucket(RCPtr<VBucket> &vb) {
95         update();
96 
97         bool newCheckpointCreated = false;
98         size_t removed = vb->checkpointManager.removeClosedUnrefCheckpoints(vb,
99                                                          newCheckpointCreated);
100         stats.itemsRemovedFromCheckpoints.fetch_add(removed);
101         // If the new checkpoint is created, notify this event to the
102         // corresponding paused TAP & DCP connections.
103         if (newCheckpointCreated) {
104             store.getEPEngine().getTapConnMap().notifyVBConnections(
105                                                                    vb->getId());
106             store.getEPEngine().getDcpConnMap().notifyVBConnections(
107                                         vb->getId(),
108                                         vb->checkpointManager.getHighSeqno());
109         }
110 
111         // fast path for expiry item pager
112         if (percent <= 0 || !pager_phase) {
113             return VBucketVisitor::visitBucket(vb);
114         }
115 
116         // skip active vbuckets if active resident ratio is lower than replica
117         double current = static_cast<double>(stats.getTotalMemoryUsed());
118         double lower = static_cast<double>(stats.mem_low_wat);
119         double high = static_cast<double>(stats.mem_high_wat);
120         if (vb->getState() == vbucket_state_active && current < high &&
121             store.cachedResidentRatio.activeRatio <
122             store.cachedResidentRatio.replicaRatio)
123         {
124             return false;
125         }
126 
127         if (current > lower) {
128             double p = (current - static_cast<double>(lower)) / current;
129             adjustPercent(p, vb->getState());
130             return VBucketVisitor::visitBucket(vb);
131         } else { // stop eviction whenever memory usage is below low watermark
132             completePhase = false;
133             return false;
134         }
135     }
136 
update()137     void update() {
138         store.deleteExpiredItems(expired);
139 
140         if (numEjected() > 0) {
141             LOG(EXTENSION_LOG_INFO, "Paged out %ld values", numEjected());
142         }
143 
144         size_t num_expired = expired.size();
145         if (num_expired > 0) {
146             LOG(EXTENSION_LOG_INFO, "Purged %ld expired items", num_expired);
147         }
148 
149         totalEjected += (ejected + num_expired);
150         ejected = 0;
151         expired.clear();
152     }
153 
pauseVisitor()154     bool pauseVisitor() {
155         size_t queueSize = stats.diskQueueSize.load();
156         return canPause && queueSize >= MAX_PERSISTENCE_QUEUE_SIZE;
157     }
158 
complete()159     void complete() {
160         update();
161         if (stateFinalizer) {
162             *stateFinalizer = true;
163         }
164 
165         if (pager_phase && completePhase) {
166             if (*pager_phase == PAGING_UNREFERENCED) {
167                 *pager_phase = PAGING_RANDOM;
168             } else {
169                 *pager_phase = PAGING_UNREFERENCED;
170             }
171         }
172     }
173 
174     /**
175      * Get the number of items ejected during the visit.
176      */
numEjected()177     size_t numEjected() { return ejected; }
178 
179     /**
180      * Get the total number of items whose values are ejected or removed due to
181      * the expiry time.
182      */
getTotalEjected()183     size_t getTotalEjected() { return totalEjected; }
184 
185     /**
186      * Get the total number of ejection attempts.
187      */
getTotalEjectionAttempts()188     size_t getTotalEjectionAttempts() { return totalEjectionAttempts; }
189 
190 private:
adjustPercent(double prob, vbucket_state_t state)191     void adjustPercent(double prob, vbucket_state_t state) {
192         if (state == vbucket_state_replica ||
193             state == vbucket_state_dead)
194         {
195             // replica items should have higher eviction probability
196             double p = prob*(2 - activeBias);
197             percent = p < 0.9 ? p : 0.9;
198         } else {
199             // active items have lower eviction probability
200             percent = prob*activeBias;
201         }
202     }
203 
204     void doEviction(StoredValue *v) {
205         ++totalEjectionAttempts;
206         item_eviction_policy_t policy = store.getItemEvictionPolicy();
207         if (currentBucket->ht.unlocked_ejectItem(v, policy)) {
208             ++ejected;
209         }
210     }
211 
212     std::list<std::pair<uint16_t, std::string> > expired;
213 
214     EventuallyPersistentStore &store;
215     EPStats &stats;
216     double percent;
217     double activeBias;
218     size_t ejected;
219     size_t totalEjected;
220     size_t totalEjectionAttempts;
221     time_t startTime;
222     bool *stateFinalizer;
223     bool canPause;
224     bool completePhase;
225     item_pager_phase *pager_phase;
226 };
227 
run(void)228 bool ItemPager::run(void) {
229     EventuallyPersistentStore *store = engine->getEpStore();
230     double current = static_cast<double>(stats.getTotalMemoryUsed());
231     double upper = static_cast<double>(stats.mem_high_wat);
232     double lower = static_cast<double>(stats.mem_low_wat);
233     double sleepTime = 5;
234 
235     if (current <= lower) {
236         doEvict = false;
237     }
238 
239     if (available && ((current > upper) || doEvict)) {
240         if (store->getItemEvictionPolicy() == VALUE_ONLY) {
241             doEvict = true;
242         }
243 
244         ++stats.pagerRuns;
245 
246         double toKill = (current - static_cast<double>(lower)) / current;
247 
248         std::stringstream ss;
249         ss << "Using " << stats.getTotalMemoryUsed()
250            << " bytes of memory, paging out %0f%% of items." << std::endl;
251         LOG(EXTENSION_LOG_INFO, ss.str().c_str(), (toKill*100.0));
252 
253         // compute active vbuckets evicition bias factor
254         Configuration &cfg = engine->getConfiguration();
255         size_t activeEvictPerc = cfg.getPagerActiveVbPcnt();
256         double bias = static_cast<double>(activeEvictPerc) / 50;
257 
258         available = false;
259         shared_ptr<PagingVisitor> pv(new PagingVisitor(*store, stats, toKill,
260                                                        &available,
261                                                        false, bias, &phase));
262         store->visit(pv, "Item pager", NONIO_TASK_IDX,
263                     Priority::ItemPagerPriority);
264     }
265 
266     snooze(sleepTime);
267     return true;
268 }
269 
run(void)270 bool ExpiredItemPager::run(void) {
271     EventuallyPersistentStore *store = engine->getEpStore();
272     if (available) {
273         ++stats.expiryPagerRuns;
274 
275         available = false;
276         shared_ptr<PagingVisitor> pv(new PagingVisitor(*store, stats, -1,
277                                                        &available,
278                                                        true, 1, NULL));
279         // track spawned tasks for shutdown..
280         store->visit(pv, "Expired item remover", NONIO_TASK_IDX,
281                 Priority::ItemPagerPriority, 10);
282     }
283     snooze(sleepTime);
284     return true;
285 }
286