1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "flusher.h"
19 
20 #include "common.h"
21 #include "ep_bucket.h"
22 #include "tasks.h"
23 
24 #include <platform/timeutils.h>
25 
26 #include <stdlib.h>
27 #include <sstream>
28 
Flusher(EPBucket* st, KVShard* k)29 Flusher::Flusher(EPBucket* st, KVShard* k)
30     : store(st),
31       _state(State::Initializing),
32       taskId(0),
33       minSleepTime(0.1),
34       forceShutdownReceived(false),
35       doHighPriority(false),
36       numHighPriority(0),
37       pendingMutation(false),
38       shard(k) {
39 }
40 
stop(bool isForceShutdown)41 bool Flusher::stop(bool isForceShutdown) {
42     forceShutdownReceived = isForceShutdown;
43     State to = forceShutdownReceived ? State::Stopped : State::Stopping;
44     bool ret = transitionState(to);
45     wake();
46     return ret;
47 }
48 
wait(void)49 void Flusher::wait(void) {
50     auto startt = ProcessClock::now();
51     while (_state != State::Stopped) {
52         if (!ExecutorPool::get()->wake(taskId)) {
53             std::stringstream ss;
54             ss << "Flusher::wait: taskId: " << taskId << " has vanished!";
55             LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
56             break;
57         }
58         usleep(1000);
59     }
60     auto endt = ProcessClock::now();
61     if ((endt - startt).count() > 1000) {
62         LOG(EXTENSION_LOG_NOTICE,
63             "Flusher::wait: had to wait %s for shutdown",
64             cb::time2text(endt - startt).c_str());
65     }
66 }
67 
pause(void)68 bool Flusher::pause(void) {
69     return transitionState(State::Pausing);
70 }
71 
resume(void)72 bool Flusher::resume(void) {
73     bool ret = transitionState(State::Running);
74     wake();
75     return ret;
76 }
77 
validTransition(State to) const78 bool Flusher::validTransition(State to) const {
79     // we may go to stopping from all of the stats except stopped
80     if (to == State::Stopping) {
81         return _state.load() != State::Stopped;
82     }
83 
84     switch (_state.load()) {
85     case State::Initializing:
86         return (to == State::Running);
87     case State::Running:
88         return (to == State::Pausing);
89     case State::Pausing:
90         return (to == State::Paused || to == State::Running);
91     case State::Paused:
92         return (to == State::Running);
93     case State::Stopping:
94         return (to == State::Stopped);
95     case State::Stopped:
96         return false;
97     }
98     throw std::logic_error(
99             "Flusher::validTransition: called with invalid "
100             "_state:" +
101             std::to_string(int(_state.load())));
102 }
103 
stateName(State st) const104 const char* Flusher::stateName(State st) const {
105     switch (st) {
106     case State::Initializing:
107         return "initializing";
108     case State::Running:
109         return "running";
110     case State::Pausing:
111         return "pausing";
112     case State::Paused:
113         return "paused";
114     case State::Stopping:
115         return "stopping";
116     case State::Stopped:
117         return "stopped";
118     }
119     throw std::logic_error(
120             "Flusher::stateName: called with invalid "
121             "state:" +
122             std::to_string(int(st)));
123 }
124 
transitionState(State to)125 bool Flusher::transitionState(State to) {
126     if (!forceShutdownReceived && !validTransition(to)) {
127         LOG(EXTENSION_LOG_WARNING,
128             "Flusher::transitionState: invalid transition _state:%s, to:%s",
129             stateName(_state),
130             stateName(to));
131         return false;
132     }
133 
134     LOG(EXTENSION_LOG_DEBUG,
135         "Flusher::transitionState: from %s to %s",
136         stateName(_state),
137         stateName(to));
138 
139     _state = to;
140     return true;
141 }
142 
stateName() const143 const char* Flusher::stateName() const {
144     return stateName(_state);
145 }
146 
initialize()147 void Flusher::initialize() {
148     LOG(EXTENSION_LOG_DEBUG, "Flusher::initialize: initializing");
149     transitionState(State::Running);
150 }
151 
schedule_UNLOCKED()152 void Flusher::schedule_UNLOCKED() {
153     ExecutorPool* iom = ExecutorPool::get();
154     ExTask task = std::make_shared<FlusherTask>(
155             ObjectRegistry::getCurrentEngine(), this, shard->getId());
156     this->setTaskId(task->getId());
157     iom->schedule(task);
158 }
159 
start()160 void Flusher::start() {
161     LockHolder lh(taskMutex);
162     if (taskId) {
163         LOG(EXTENSION_LOG_WARNING,
164             "Flusher::start: double start in flusher task id %" PRIu64 ": %s",
165             uint64_t(taskId.load()),
166             stateName());
167         return;
168     }
169     schedule_UNLOCKED();
170 }
171 
wake(void)172 void Flusher::wake(void) {
173     // taskId becomes zero if the flusher were stopped
174     if (taskId > 0) {
175         ExecutorPool::get()->wake(taskId);
176     }
177 }
178 
step(GlobalTask *task)179 bool Flusher::step(GlobalTask *task) {
180     State currentState = _state.load();
181 
182     switch (currentState) {
183     case State::Initializing:
184         if (task->getId() != taskId) {
185             throw std::invalid_argument("Flusher::step: Argument "
186                     "task->getId() (which is" + std::to_string(task->getId()) +
187                     ") does not equal member variable taskId (which is" +
188                     std::to_string(taskId.load()));
189         }
190         initialize();
191         return true;
192 
193     case State::Paused:
194     case State::Pausing:
195         if (currentState == State::Pausing) {
196             transitionState(State::Paused);
197         }
198         // Indefinitely put task to sleep..
199         task->snooze(INT_MAX);
200         return true;
201 
202     case State::Running:
203         flushVB();
204         if (_state == State::Running) {
205             double tosleep = computeMinSleepTime();
206             if (tosleep > 0) {
207                 task->snooze(tosleep);
208             }
209         }
210         return true;
211 
212     case State::Stopping:
213         LOG(EXTENSION_LOG_DEBUG,
214             "Flusher::step: stopping flusher (write of all dirty items)");
215         completeFlush();
216         LOG(EXTENSION_LOG_DEBUG, "Flusher::step: stopped");
217         transitionState(State::Stopped);
218         return false;
219 
220     case State::Stopped:
221         taskId = 0;
222         return false;
223     }
224 
225     // If we got here there was an unhandled switch case
226     throw std::logic_error("Flusher::step: invalid _state:" +
227                            std::to_string(int(currentState)));
228 }
229 
completeFlush()230 void Flusher::completeFlush() {
231     while(!canSnooze()) {
232         flushVB();
233     }
234 }
235 
computeMinSleepTime()236 double Flusher::computeMinSleepTime() {
237     if (!canSnooze() || shard->highPriorityCount.load() > 0) {
238         minSleepTime = DEFAULT_MIN_SLEEP_TIME;
239         return 0;
240     }
241     minSleepTime *= 2;
242     return std::min(minSleepTime, DEFAULT_MAX_SLEEP_TIME);
243 }
244 
flushVB(void)245 void Flusher::flushVB(void) {
246     if (store->isDeleteAllScheduled() && shard->getId() != EP_PRIMARY_SHARD) {
247         // another shard is doing disk flush
248         bool inverse = false;
249         pendingMutation.compare_exchange_strong(inverse, true);
250         return;
251     }
252 
253     // If the low-priority vBucket queue is empty, see if there's any
254     // pending mutations - and if so re-populate the low pri queue.
255     if (lpVbs.empty()) {
256         if (hpVbs.empty()) {
257             doHighPriority = false;
258         }
259         bool inverse = true;
260         if (pendingMutation.compare_exchange_strong(inverse, false)) {
261             for (auto vbid : shard->getVBucketsSortedByState()) {
262                 lpVbs.push(vbid);
263             }
264         }
265     }
266 
267     if (!doHighPriority && shard->highPriorityCount.load() > 0) {
268         for (auto vbid : shard->getVBuckets()) {
269             VBucketPtr vb = store->getVBucket(vbid);
270             if (vb && vb->getHighPriorityChkSize() > 0) {
271                 hpVbs.push(vbid);
272             }
273         }
274         numHighPriority = hpVbs.size();
275         if (!hpVbs.empty()) {
276             doHighPriority = true;
277         }
278     }
279 
280     if (hpVbs.empty() && lpVbs.empty()) {
281         LOG(EXTENSION_LOG_INFO,
282             "Flusher::flushVB: Trying to flush but no vbuckets exist");
283         return;
284     } else if (!hpVbs.empty()) {
285         uint16_t vbid = hpVbs.front();
286         hpVbs.pop();
287         if (store->flushVBucket(vbid).first) {
288             // More items still available, add vbid back to pending set.
289             hpVbs.push(vbid);
290         }
291     } else {
292         if (doHighPriority && --numHighPriority == 0) {
293             doHighPriority = false;
294         }
295         uint16_t vbid = lpVbs.front();
296         lpVbs.pop();
297         if (store->flushVBucket(vbid).first) {
298             // More items still available, add vbid back to pending set.
299             lpVbs.push(vbid);
300         }
301     }
302 }
303