1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2010 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include <stdlib.h>
21 
22 #include <algorithm>
23 #include <list>
24 #include <map>
25 #include <vector>
26 #include <sstream>
27 
28 #include "flusher.h"
29 
stop(bool isForceShutdown)30 bool Flusher::stop(bool isForceShutdown) {
31     forceShutdownReceived = isForceShutdown;
32     enum flusher_state to = forceShutdownReceived ? stopped : stopping;
33     bool ret = transition_state(to);
34     wake();
35     return ret;
36 }
37 
wait(void)38 void Flusher::wait(void) {
39     hrtime_t startt(gethrtime());
40     while (_state != stopped) {
41         if (!ExecutorPool::get()->wake(taskId)) {
42             std::stringstream ss;
43             ss << "Flusher task " << taskId << " vanished!";
44             LOG(EXTENSION_LOG_WARNING, ss.str().c_str());
45             break;
46         }
47         usleep(1000);
48     }
49     hrtime_t endt(gethrtime());
50     if ((endt - startt) > 1000) {
51         LOG(EXTENSION_LOG_WARNING,  "Had to wait %s for shutdown\n",
52             hrtime2text(endt - startt).c_str());
53     }
54 }
55 
pause(void)56 bool Flusher::pause(void) {
57     return transition_state(pausing);
58 }
59 
resume(void)60 bool Flusher::resume(void) {
61     bool ret = transition_state(running);
62     wake();
63     return ret;
64 }
65 
validTransition(enum flusher_state from, enum flusher_state to)66 static bool validTransition(enum flusher_state from,
67                             enum flusher_state to)
68 {
69     // we may go to stopping from all of the stats except stopped
70     if (to == stopping) {
71         return from != stopped;
72     }
73 
74     switch (from) {
75     case initializing:
76         return (to == running);
77     case running:
78         return (to == pausing);
79     case pausing:
80         return (to == paused || to == running);
81     case paused:
82         return (to == running);
83     case stopping:
84         return (to == stopped);
85     case stopped:
86         return false;
87     }
88     // THis should be impossible (unless someone added new states)
89     abort();
90 }
91 
stateName(enum flusher_state st) const92 const char * Flusher::stateName(enum flusher_state st) const {
93     static const char * const stateNames[] = {
94         "initializing", "running", "pausing", "paused", "stopping", "stopped"
95     };
96     cb_assert(st >= initializing && st <= stopped);
97     return stateNames[st];
98 }
99 
transition_state(enum flusher_state to)100 bool Flusher::transition_state(enum flusher_state to) {
101 
102     LOG(EXTENSION_LOG_DEBUG, "Attempting transition from %s to %s",
103         stateName(_state), stateName(to));
104 
105     if (!forceShutdownReceived && !validTransition(_state, to)) {
106         LOG(EXTENSION_LOG_WARNING, "Invalid transitioning from %s to %s",
107             stateName(_state), stateName(to));
108         return false;
109     }
110 
111     LOG(EXTENSION_LOG_DEBUG, "Transitioning from %s to %s",
112         stateName(_state), stateName(to));
113 
114     _state = to;
115     return true;
116 }
117 
stateName() const118 const char * Flusher::stateName() const {
119     return stateName(_state);
120 }
121 
state() const122 enum flusher_state Flusher::state() const {
123     return _state;
124 }
125 
initialize(size_t tid)126 void Flusher::initialize(size_t tid) {
127     cb_assert(taskId == tid);
128     LOG(EXTENSION_LOG_DEBUG, "Initializing flusher");
129     transition_state(running);
130 }
131 
schedule_UNLOCKED()132 void Flusher::schedule_UNLOCKED() {
133     ExecutorPool* iom = ExecutorPool::get();
134     ExTask task = new FlusherTask(ObjectRegistry::getCurrentEngine(),
135                                   this, Priority::FlusherPriority,
136                                   shard->getId());
137     this->setTaskId(task->getId());
138     iom->schedule(task, WRITER_TASK_IDX);
139     cb_assert(taskId > 0);
140 }
141 
start()142 void Flusher::start() {
143     LockHolder lh(taskMutex);
144     if (taskId) {
145         LOG(EXTENSION_LOG_WARNING, "Double start in flusher task id %llu: %s",
146                 taskId, stateName());
147         return;
148     }
149     schedule_UNLOCKED();
150 }
151 
wake(void)152 void Flusher::wake(void) {
153     LockHolder lh(taskMutex);
154     cb_assert(taskId > 0);
155     ExecutorPool::get()->wake(taskId);
156 }
157 
step(GlobalTask *task)158 bool Flusher::step(GlobalTask *task) {
159     try {
160         switch (_state) {
161         case initializing:
162             initialize(task->getId());
163             return true;
164         case paused:
165         case pausing:
166             if (_state == pausing) {
167                 transition_state(paused);
168             }
169             // Indefinitely put task to sleep..
170             task->snooze(INT_MAX);
171             return true;
172         case running:
173             {
174                 flushVB();
175                 if (_state == running) {
176                     double tosleep = computeMinSleepTime();
177                     if (tosleep > 0) {
178                         task->snooze(tosleep);
179                     }
180                 }
181                 return true;
182             }
183         case stopping:
184             {
185                 std::stringstream ss;
186                 ss << "Shutting down flusher (Write of all dirty items)"
187                    << std::endl;
188                 LOG(EXTENSION_LOG_DEBUG, "%s", ss.str().c_str());
189             }
190             completeFlush();
191             LOG(EXTENSION_LOG_DEBUG, "Flusher stopped");
192             transition_state(stopped);
193         case stopped:
194             {
195                 LockHolder lh(taskMutex);
196                 taskId = 0;
197                 return false;
198             }
199         default:
200             LOG(EXTENSION_LOG_WARNING, "Unexpected state in flusher: %s",
201                 stateName());
202             cb_assert(false);
203         }
204     } catch(std::runtime_error &e) {
205         std::stringstream ss;
206         ss << "Exception in flusher loop: " << e.what() << std::endl;
207         LOG(EXTENSION_LOG_WARNING, "%s", ss.str().c_str());
208         cb_assert(false);
209     }
210 
211     // We should _NEVER_ get here (unless you compile with -DNDEBUG causing
212     // the assertions to be removed.. It's a bug, so we should abort and
213     // create a coredump
214     abort();
215 }
216 
completeFlush()217 void Flusher::completeFlush() {
218     while(!canSnooze()) {
219         flushVB();
220     }
221 }
222 
computeMinSleepTime()223 double Flusher::computeMinSleepTime() {
224     if (!canSnooze() || shard->highPriorityCount.load() > 0) {
225         minSleepTime = DEFAULT_MIN_SLEEP_TIME;
226         return 0;
227     }
228     minSleepTime *= 2;
229     return std::min(minSleepTime, DEFAULT_MAX_SLEEP_TIME);
230 }
231 
flushVB(void)232 void Flusher::flushVB(void) {
233     if (store->diskFlushAll && shard->getId() != EP_PRIMARY_SHARD) {
234         // another shard is doing disk flush
235         bool inverse = false;
236         pendingMutation.compare_exchange_strong(inverse, true);
237         return;
238     }
239 
240     if (lpVbs.empty()) {
241         if (hpVbs.empty()) {
242             doHighPriority = false;
243         }
244         bool inverse = true;
245         pendingMutation.compare_exchange_strong(inverse, false);
246         std::vector<int> vbs = shard->getVBucketsSortedByState();
247         std::vector<int>::iterator itr = vbs.begin();
248         for (; itr != vbs.end(); ++itr) {
249             lpVbs.push(static_cast<uint16_t>(*itr));
250         }
251     }
252 
253     if (!doHighPriority && shard->highPriorityCount.load() > 0) {
254         std::vector<int> vbs = shard->getVBuckets();
255         std::vector<int>::iterator itr = vbs.begin();
256         for (; itr != vbs.end(); ++itr) {
257             RCPtr<VBucket> vb = store->getVBucket(*itr);
258             if (vb && vb->getHighPriorityChkSize() > 0) {
259                 hpVbs.push(static_cast<uint16_t>(*itr));
260             }
261         }
262         numHighPriority = hpVbs.size();
263         if (!hpVbs.empty()) {
264             doHighPriority = true;
265         }
266     }
267 
268     if (hpVbs.empty() && lpVbs.empty()) {
269         LOG(EXTENSION_LOG_INFO, "Trying to flush but no vbucket exist");
270         return;
271     } else if (!hpVbs.empty()) {
272         uint16_t vbid = hpVbs.front();
273         hpVbs.pop();
274         if (store->flushVBucket(vbid) == RETRY_FLUSH_VBUCKET) {
275             hpVbs.push(vbid);
276         }
277     } else {
278         if (doHighPriority && --numHighPriority == 0) {
279             doHighPriority = false;
280         }
281         uint16_t vbid = lpVbs.front();
282         lpVbs.pop();
283         if (store->flushVBucket(vbid) == RETRY_FLUSH_VBUCKET) {
284             lpVbs.push(vbid);
285         }
286     }
287 }
288