1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "dcp/backfill-manager.h"
19#include "bucket_logger.h"
20#include "connmap.h"
21#include "dcp/active_stream.h"
22#include "dcp/backfill_disk.h"
23#include "dcp/dcpconnmap.h"
24#include "dcp/producer.h"
25#include "ep_time.h"
26#include "executorpool.h"
27#include "kv_bucket.h"
28
29#include <phosphor/phosphor.h>
30
31static const size_t sleepTime = 1;
32
33class BackfillManagerTask : public GlobalTask {
34public:
35    BackfillManagerTask(EventuallyPersistentEngine& e,
36                        std::weak_ptr<BackfillManager> mgr,
37                        double sleeptime = 0,
38                        bool completeBeforeShutdown = false)
39        : GlobalTask(&e,
40                     TaskId::BackfillManagerTask,
41                     sleeptime,
42                     completeBeforeShutdown),
43          weak_manager(mgr) {
44    }
45
46    bool run();
47
48    std::string getDescription();
49
50    std::chrono::microseconds maxExpectedDuration();
51
52private:
53    // A weak pointer to the backfill manager which owns this
54    // task. The manager is owned by the DcpProducer, but we need to
55    // give the BackfillManagerTask access to the manager as it runs
56    // concurrently in a different thread.
57    // If the manager is deleted (by the DcpProducer) then the
58    // ManagerTask simply cancels itself and stops running.
59    std::weak_ptr<BackfillManager> weak_manager;
60};
61
62bool BackfillManagerTask::run() {
63    TRACE_EVENT0("ep-engine/task", "BackFillManagerTask");
64    // Create a new shared_ptr to the manager for the duration of this
65    // execution.
66    auto manager = weak_manager.lock();
67    if (!manager) {
68        // backfill manager no longer exists - cancel ourself and stop
69        // running.
70        cancel();
71        return false;
72    }
73
74    backfill_status_t status = manager->backfill();
75    if (status == backfill_finished) {
76        return false;
77    } else if (status == backfill_snooze) {
78        snooze(sleepTime);
79    }
80
81    if (engine->getEpStats().isShutdown) {
82        return false;
83    }
84
85    return true;
86}
87
88std::string BackfillManagerTask::getDescription() {
89    return "Backfilling items for a DCP Connection";
90}
91
92std::chrono::microseconds BackfillManagerTask::maxExpectedDuration() {
93    // Empirical evidence suggests this task runs under 300ms 99.999% of
94    // the time.
95    return std::chrono::milliseconds(300);
96}
97
98BackfillManager::BackfillManager(KVBucket& kvBucket,
99                                 BackfillTrackingIface& backfillTracker,
100                                 size_t scanByteLimit,
101                                 size_t scanItemLimit,
102                                 size_t backfillByteLimit)
103    : kvBucket(kvBucket), backfillTracker(backfillTracker), managerTask(NULL) {
104    scanBuffer.bytesRead = 0;
105    scanBuffer.itemsRead = 0;
106    scanBuffer.maxBytes = scanByteLimit;
107    scanBuffer.maxItems = scanItemLimit;
108
109    buffer.bytesRead = 0;
110    buffer.maxBytes = backfillByteLimit;
111    buffer.nextReadSize = 0;
112    buffer.full = false;
113}
114
115BackfillManager::BackfillManager(KVBucket& kvBucket,
116                                 BackfillTrackingIface& backfillTracker,
117                                 const Configuration& config)
118    : BackfillManager(kvBucket,
119                      backfillTracker,
120                      config.getDcpScanByteLimit(),
121                      config.getDcpScanItemLimit(),
122                      config.getDcpBackfillByteLimit()) {
123}
124
125void BackfillManager::addStats(DcpProducer& conn,
126                               const AddStatFn& add_stat,
127                               const void* c) {
128    LockHolder lh(lock);
129    conn.addStat("backfill_buffer_bytes_read", buffer.bytesRead, add_stat, c);
130    conn.addStat(
131            "backfill_buffer_next_read_size", buffer.nextReadSize, add_stat, c);
132    conn.addStat("backfill_buffer_max_bytes", buffer.maxBytes, add_stat, c);
133    conn.addStat("backfill_buffer_full", buffer.full, add_stat, c);
134    conn.addStat("backfill_num_active", activeBackfills.size(), add_stat, c);
135    conn.addStat(
136            "backfill_num_snoozing", snoozingBackfills.size(), add_stat, c);
137    conn.addStat("backfill_num_pending", pendingBackfills.size(), add_stat, c);
138    conn.addStat("backfill_order", to_string(scheduleOrder), add_stat, c);
139}
140
141BackfillManager::~BackfillManager() {
142    if (managerTask) {
143        managerTask->cancel();
144        managerTask.reset();
145    }
146
147    while (!activeBackfills.empty()) {
148        UniqueDCPBackfillPtr backfill = std::move(activeBackfills.front());
149        activeBackfills.pop_front();
150        backfill->cancel();
151        backfillTracker.decrNumActiveSnoozingBackfills();
152    }
153
154    while (!snoozingBackfills.empty()) {
155        UniqueDCPBackfillPtr backfill =
156                std::move((snoozingBackfills.front()).second);
157        snoozingBackfills.pop_front();
158        backfill->cancel();
159        backfillTracker.decrNumActiveSnoozingBackfills();
160    }
161
162    while (!pendingBackfills.empty()) {
163        UniqueDCPBackfillPtr backfill = std::move(pendingBackfills.front());
164        pendingBackfills.pop_front();
165        backfill->cancel();
166    }
167}
168
169void BackfillManager::setBackfillOrder(BackfillManager::ScheduleOrder order) {
170    scheduleOrder = order;
171}
172
173BackfillManager::ScheduleResult BackfillManager::schedule(
174        UniqueDCPBackfillPtr backfill) {
175    LockHolder lh(lock);
176    ScheduleResult result;
177    if (backfillTracker.canAddBackfillToActiveQ()) {
178        initializingBackfills.push_back(std::move(backfill));
179        result = ScheduleResult::Active;
180    } else {
181        pendingBackfills.push_back(std::move(backfill));
182        result = ScheduleResult::Pending;
183    }
184
185    if (managerTask && !managerTask->isdead()) {
186        ExecutorPool::get()->wake(managerTask->getId());
187    } else {
188        managerTask.reset(new BackfillManagerTask(kvBucket.getEPEngine(),
189                                                  shared_from_this()));
190        ExecutorPool::get()->schedule(managerTask);
191    }
192    return result;
193}
194
195bool BackfillManager::bytesCheckAndRead(size_t bytes) {
196    LockHolder lh(lock);
197    if (scanBuffer.itemsRead >= scanBuffer.maxItems) {
198        return false;
199    }
200
201    // Always allow an item to be backfilled if the scan buffer is empty,
202    // otherwise check to see if there is room for the item.
203    if (scanBuffer.bytesRead + bytes <= scanBuffer.maxBytes ||
204        scanBuffer.bytesRead == 0) {
205        scanBuffer.bytesRead += bytes;
206    } else {
207        /* Subsequent items for this backfill will be read in next run */
208        return false;
209    }
210
211    if (buffer.bytesRead == 0 || buffer.bytesRead + bytes <= buffer.maxBytes) {
212        buffer.bytesRead += bytes;
213    } else {
214        scanBuffer.bytesRead -= bytes;
215        buffer.full = true;
216        buffer.nextReadSize = bytes;
217        return false;
218    }
219
220    scanBuffer.itemsRead++;
221
222    return true;
223}
224
225void BackfillManager::bytesForceRead(size_t bytes) {
226    LockHolder lh(lock);
227
228    /* Irrespective of the scan buffer usage and overall backfill buffer usage
229       we want to complete this backfill */
230    ++scanBuffer.itemsRead;
231    scanBuffer.bytesRead += bytes;
232    buffer.bytesRead += bytes;
233
234    if (buffer.bytesRead > buffer.maxBytes) {
235        /* Setting this flag prevents running other backfills and hence prevents
236           further increase in the memory usage.
237           Note: The current backfill will run to completion and that is desired
238                 here. */
239        buffer.full = true;
240    }
241}
242
243void BackfillManager::bytesSent(size_t bytes) {
244    LockHolder lh(lock);
245    if (bytes > buffer.bytesRead) {
246        throw std::invalid_argument("BackfillManager::bytesSent: bytes "
247                "(which is" + std::to_string(bytes) + ") is greater than "
248                "buffer.bytesRead (which is" + std::to_string(buffer.bytesRead) + ")");
249    }
250    buffer.bytesRead -= bytes;
251
252    if (buffer.full) {
253        /* We can have buffer.bytesRead > buffer.maxBytes */
254        size_t unfilledBufferSize = (buffer.maxBytes > buffer.bytesRead)
255                                            ? buffer.maxBytes - buffer.bytesRead
256                                            : buffer.maxBytes;
257
258        /* If buffer.bytesRead == 0 we want to fit the next read into the
259           backfill buffer irrespective of its size */
260        bool canFitNext = (buffer.bytesRead == 0) ||
261                          (unfilledBufferSize >= buffer.nextReadSize);
262
263        /* <= implicitly takes care of the case where
264           buffer.bytesRead == (buffer.maxBytes * 3 / 4) == 0 */
265        bool enoughCleared = buffer.bytesRead <= (buffer.maxBytes * 3 / 4);
266        if (canFitNext && enoughCleared) {
267            buffer.nextReadSize = 0;
268            buffer.full = false;
269            if (managerTask) {
270                ExecutorPool::get()->wake(managerTask->getId());
271            }
272        }
273    }
274}
275
276backfill_status_t BackfillManager::backfill() {
277    std::unique_lock<std::mutex> lh(lock);
278
279    // If no backfills remaining in any of the queues then we can
280    // stop the background task and finish.
281    if (initializingBackfills.empty() && activeBackfills.empty() &&
282        snoozingBackfills.empty() && pendingBackfills.empty()) {
283        managerTask.reset();
284        return backfill_finished;
285    }
286
287    if (kvBucket.isMemoryUsageTooHigh()) {
288        EP_LOG_INFO(
289                "DCP backfilling task temporarily suspended "
290                "because the current memory usage is too high");
291        return backfill_snooze;
292    }
293
294    movePendingToInitializing();
295    moveSnoozingToActiveQueue();
296
297    if (buffer.full) {
298        // If the buffer is full check to make sure we don't have any backfills
299        // that no longer have active streams and remove them. This prevents an
300        // issue where we have dead backfills taking up buffer space.
301        std::list<UniqueDCPBackfillPtr> toDelete;
302        for (auto a_itr = activeBackfills.begin();
303             a_itr != activeBackfills.end();) {
304            if ((*a_itr)->isStreamDead()) {
305                (*a_itr)->cancel();
306                toDelete.push_back(std::move(*a_itr));
307                a_itr = activeBackfills.erase(a_itr);
308                backfillTracker.decrNumActiveSnoozingBackfills();
309            } else {
310                ++a_itr;
311            }
312        }
313
314        lh.unlock();
315        bool reschedule = !toDelete.empty();
316        while (!toDelete.empty()) {
317            UniqueDCPBackfillPtr backfill = std::move(toDelete.front());
318            toDelete.pop_front();
319        }
320        return reschedule ? backfill_success : backfill_snooze;
321    }
322
323    UniqueDCPBackfillPtr backfill;
324    Source source;
325    std::tie(backfill, source) = dequeueNextBackfill(lh);
326
327    // If no backfills ready to run then snooze
328    if (!backfill) {
329        return backfill_snooze;
330    }
331
332    lh.unlock();
333    backfill_status_t status = backfill->run();
334    lh.lock();
335
336    scanBuffer.bytesRead = 0;
337    scanBuffer.itemsRead = 0;
338
339    switch (status) {
340        case backfill_success:
341            switch (scheduleOrder) {
342            case ScheduleOrder::RoundRobin:
343                activeBackfills.push_back(std::move(backfill));
344                break;
345            case ScheduleOrder::Sequential:
346                switch (source) {
347                case Source::Active:
348                    // If the source is active then this is the "current"
349                    // backfill we are working our way through, we want
350                    // to put it back on the front of the active queue to
351                    // run next time.
352                    activeBackfills.push_front(std::move(backfill));
353                    break;
354                case Source::Initializing:
355                    // New - this was only run to initialise it; it should
356                    // now go to the back of the active queue so the
357                    // "current" Backfill can resume to completion.
358                    // round-robin and sequential:
359                    activeBackfills.push_back(std::move(backfill));
360                    break;
361                }
362                break;
363            }
364            break;
365        case backfill_finished:
366            lh.unlock();
367            backfillTracker.decrNumActiveSnoozingBackfills();
368            break;
369        case backfill_snooze: {
370            snoozingBackfills.emplace_back(ep_current_time(),
371                                           std::move(backfill));
372            break;
373        }
374    }
375
376    return backfill_success;
377}
378
379void BackfillManager::movePendingToInitializing() {
380    while (!pendingBackfills.empty() &&
381           backfillTracker.canAddBackfillToActiveQ()) {
382        initializingBackfills.splice(initializingBackfills.end(),
383                                     pendingBackfills,
384                                     pendingBackfills.begin());
385    }
386}
387
388void BackfillManager::moveSnoozingToActiveQueue() {
389    while (!snoozingBackfills.empty()) {
390        std::pair<rel_time_t, UniqueDCPBackfillPtr> snoozer =
391                std::move(snoozingBackfills.front());
392        snoozingBackfills.pop_front();
393        // If snoozing task is found to be sleeping for greater than
394        // allowed snoozetime, push into active queue
395        if (snoozer.first + sleepTime <= ep_current_time()) {
396            activeBackfills.push_back(std::move(snoozer.second));
397        } else {
398            // Push back the popped snoozing backfill
399            snoozingBackfills.push_back(std::move(snoozer));
400            break;
401        }
402    }
403}
404
405std::pair<UniqueDCPBackfillPtr, BackfillManager::Source>
406BackfillManager::dequeueNextBackfill(std::unique_lock<std::mutex>&) {
407    // Dequeue from initializingBackfills if non-empty, else activeBackfills.
408    auto& queue = initializingBackfills.empty() ? activeBackfills
409                                                : initializingBackfills;
410    auto source = initializingBackfills.empty() ? Source::Active
411                                                : Source::Initializing;
412    if (!queue.empty()) {
413        auto next = std::move(queue.front());
414        queue.pop_front();
415        return {std::move(next), source};
416    }
417    return {};
418}
419
420void BackfillManager::wakeUpTask() {
421    LockHolder lh(lock);
422    if (managerTask) {
423        ExecutorPool::get()->wake(managerTask->getId());
424    }
425}
426std::string BackfillManager::to_string(BackfillManager::ScheduleOrder order) {
427    switch (order) {
428    case BackfillManager::ScheduleOrder::RoundRobin:
429        return "round-robin";
430    case BackfillManager::ScheduleOrder::Sequential:
431        return "sequential";
432    }
433    folly::assume_unreachable();
434}
435