1/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2/*
3 *     Copyright 2015 Couchbase, Inc
4 *
5 *   Licensed under the Apache License, Version 2.0 (the "License");
6 *   you may not use this file except in compliance with the License.
7 *   You may obtain a copy of the License at
8 *
9 *       http://www.apache.org/licenses/LICENSE-2.0
10 *
11 *   Unless required by applicable law or agreed to in writing, software
12 *   distributed under the License is distributed on an "AS IS" BASIS,
13 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *   See the License for the specific language governing permissions and
15 *   limitations under the License.
16 */
17
18#include "dcp/backfill-manager.h"
19#include "bucket_logger.h"
20#include "connmap.h"
21#include "dcp/active_stream.h"
22#include "dcp/backfill_disk.h"
23#include "dcp/dcpconnmap.h"
24#include "dcp/producer.h"
25#include "ep_time.h"
26#include "executorpool.h"
27#include "kv_bucket.h"
28
29#include <phosphor/phosphor.h>
30
31static const size_t sleepTime = 1;
32
33class BackfillManagerTask : public GlobalTask {
34public:
35    BackfillManagerTask(EventuallyPersistentEngine& e,
36                        std::weak_ptr<BackfillManager> mgr,
37                        double sleeptime = 0,
38                        bool completeBeforeShutdown = false)
39        : GlobalTask(&e,
40                     TaskId::BackfillManagerTask,
41                     sleeptime,
42                     completeBeforeShutdown),
43          weak_manager(mgr) {
44    }
45
46    bool run();
47
48    std::string getDescription();
49
50    std::chrono::microseconds maxExpectedDuration();
51
52private:
53    // A weak pointer to the backfill manager which owns this
54    // task. The manager is owned by the DcpProducer, but we need to
55    // give the BackfillManagerTask access to the manager as it runs
56    // concurrently in a different thread.
57    // If the manager is deleted (by the DcpProducer) then the
58    // ManagerTask simply cancels itself and stops running.
59    std::weak_ptr<BackfillManager> weak_manager;
60};
61
62bool BackfillManagerTask::run() {
63    TRACE_EVENT0("ep-engine/task", "BackFillManagerTask");
64    // Create a new shared_ptr to the manager for the duration of this
65    // execution.
66    auto manager = weak_manager.lock();
67    if (!manager) {
68        // backfill manager no longer exists - cancel ourself and stop
69        // running.
70        cancel();
71        return false;
72    }
73
74    backfill_status_t status = manager->backfill();
75    if (status == backfill_finished) {
76        return false;
77    } else if (status == backfill_snooze) {
78        snooze(sleepTime);
79    }
80
81    if (engine->getEpStats().isShutdown) {
82        return false;
83    }
84
85    return true;
86}
87
88std::string BackfillManagerTask::getDescription() {
89    return "Backfilling items for a DCP Connection";
90}
91
92std::chrono::microseconds BackfillManagerTask::maxExpectedDuration() {
93    // Empirical evidence suggests this task runs under 300ms 99.999% of
94    // the time.
95    return std::chrono::milliseconds(300);
96}
97
98BackfillManager::BackfillManager(KVBucket& kvBucket,
99                                 BackfillTrackingIface& backfillTracker,
100                                 size_t scanByteLimit,
101                                 size_t scanItemLimit,
102                                 size_t backfillByteLimit)
103    : kvBucket(kvBucket), backfillTracker(backfillTracker), managerTask(NULL) {
104    scanBuffer.bytesRead = 0;
105    scanBuffer.itemsRead = 0;
106    scanBuffer.maxBytes = scanByteLimit;
107    scanBuffer.maxItems = scanItemLimit;
108
109    buffer.bytesRead = 0;
110    buffer.maxBytes = backfillByteLimit;
111    buffer.nextReadSize = 0;
112    buffer.full = false;
113}
114
115BackfillManager::BackfillManager(KVBucket& kvBucket,
116                                 BackfillTrackingIface& backfillTracker,
117                                 const Configuration& config)
118    : BackfillManager(kvBucket,
119                      backfillTracker,
120                      config.getDcpScanByteLimit(),
121                      config.getDcpScanItemLimit(),
122                      config.getDcpBackfillByteLimit()) {
123}
124
125void BackfillManager::addStats(DcpProducer& conn,
126                               const AddStatFn& add_stat,
127                               const void* c) {
128    LockHolder lh(lock);
129    conn.addStat("backfill_buffer_bytes_read", buffer.bytesRead, add_stat, c);
130    conn.addStat(
131            "backfill_buffer_next_read_size", buffer.nextReadSize, add_stat, c);
132    conn.addStat("backfill_buffer_max_bytes", buffer.maxBytes, add_stat, c);
133    conn.addStat("backfill_buffer_full", buffer.full, add_stat, c);
134    conn.addStat("backfill_num_active", activeBackfills.size(), add_stat, c);
135    conn.addStat(
136            "backfill_num_snoozing", snoozingBackfills.size(), add_stat, c);
137    conn.addStat("backfill_num_pending", pendingBackfills.size(), add_stat, c);
138    conn.addStat("backfill_order", to_string(scheduleOrder), add_stat, c);
139}
140
141BackfillManager::~BackfillManager() {
142    if (managerTask) {
143        managerTask->cancel();
144        managerTask.reset();
145    }
146
147    while (!initializingBackfills.empty()) {
148        UniqueDCPBackfillPtr backfill =
149                std::move(initializingBackfills.front());
150        initializingBackfills.pop_front();
151        backfill->cancel();
152        backfillTracker.decrNumActiveSnoozingBackfills();
153    }
154
155    while (!activeBackfills.empty()) {
156        UniqueDCPBackfillPtr backfill = std::move(activeBackfills.front());
157        activeBackfills.pop_front();
158        backfill->cancel();
159        backfillTracker.decrNumActiveSnoozingBackfills();
160    }
161
162    while (!snoozingBackfills.empty()) {
163        UniqueDCPBackfillPtr backfill =
164                std::move((snoozingBackfills.front()).second);
165        snoozingBackfills.pop_front();
166        backfill->cancel();
167        backfillTracker.decrNumActiveSnoozingBackfills();
168    }
169
170    while (!pendingBackfills.empty()) {
171        UniqueDCPBackfillPtr backfill = std::move(pendingBackfills.front());
172        pendingBackfills.pop_front();
173        backfill->cancel();
174    }
175}
176
177void BackfillManager::setBackfillOrder(BackfillManager::ScheduleOrder order) {
178    scheduleOrder = order;
179}
180
181BackfillManager::ScheduleResult BackfillManager::schedule(
182        UniqueDCPBackfillPtr backfill) {
183    LockHolder lh(lock);
184    ScheduleResult result;
185    if (backfillTracker.canAddBackfillToActiveQ()) {
186        initializingBackfills.push_back(std::move(backfill));
187        result = ScheduleResult::Active;
188    } else {
189        pendingBackfills.push_back(std::move(backfill));
190        result = ScheduleResult::Pending;
191    }
192
193    if (managerTask && !managerTask->isdead()) {
194        ExecutorPool::get()->wake(managerTask->getId());
195    } else {
196        managerTask.reset(new BackfillManagerTask(kvBucket.getEPEngine(),
197                                                  shared_from_this()));
198        ExecutorPool::get()->schedule(managerTask);
199    }
200    return result;
201}
202
203bool BackfillManager::bytesCheckAndRead(size_t bytes) {
204    LockHolder lh(lock);
205    if (scanBuffer.itemsRead >= scanBuffer.maxItems) {
206        return false;
207    }
208
209    // Always allow an item to be backfilled if the scan buffer is empty,
210    // otherwise check to see if there is room for the item.
211    if (scanBuffer.bytesRead + bytes <= scanBuffer.maxBytes ||
212        scanBuffer.bytesRead == 0) {
213        scanBuffer.bytesRead += bytes;
214    } else {
215        /* Subsequent items for this backfill will be read in next run */
216        return false;
217    }
218
219    if (buffer.bytesRead == 0 || buffer.bytesRead + bytes <= buffer.maxBytes) {
220        buffer.bytesRead += bytes;
221    } else {
222        scanBuffer.bytesRead -= bytes;
223        buffer.full = true;
224        buffer.nextReadSize = bytes;
225        return false;
226    }
227
228    scanBuffer.itemsRead++;
229
230    return true;
231}
232
233void BackfillManager::bytesForceRead(size_t bytes) {
234    LockHolder lh(lock);
235
236    /* Irrespective of the scan buffer usage and overall backfill buffer usage
237       we want to complete this backfill */
238    ++scanBuffer.itemsRead;
239    scanBuffer.bytesRead += bytes;
240    buffer.bytesRead += bytes;
241
242    if (buffer.bytesRead > buffer.maxBytes) {
243        /* Setting this flag prevents running other backfills and hence prevents
244           further increase in the memory usage.
245           Note: The current backfill will run to completion and that is desired
246                 here. */
247        buffer.full = true;
248    }
249}
250
251void BackfillManager::bytesSent(size_t bytes) {
252    LockHolder lh(lock);
253    if (bytes > buffer.bytesRead) {
254        throw std::invalid_argument("BackfillManager::bytesSent: bytes "
255                "(which is" + std::to_string(bytes) + ") is greater than "
256                "buffer.bytesRead (which is" + std::to_string(buffer.bytesRead) + ")");
257    }
258    buffer.bytesRead -= bytes;
259
260    if (buffer.full) {
261        /* We can have buffer.bytesRead > buffer.maxBytes */
262        size_t unfilledBufferSize = (buffer.maxBytes > buffer.bytesRead)
263                                            ? buffer.maxBytes - buffer.bytesRead
264                                            : buffer.maxBytes;
265
266        /* If buffer.bytesRead == 0 we want to fit the next read into the
267           backfill buffer irrespective of its size */
268        bool canFitNext = (buffer.bytesRead == 0) ||
269                          (unfilledBufferSize >= buffer.nextReadSize);
270
271        /* <= implicitly takes care of the case where
272           buffer.bytesRead == (buffer.maxBytes * 3 / 4) == 0 */
273        bool enoughCleared = buffer.bytesRead <= (buffer.maxBytes * 3 / 4);
274        if (canFitNext && enoughCleared) {
275            buffer.nextReadSize = 0;
276            buffer.full = false;
277            if (managerTask) {
278                ExecutorPool::get()->wake(managerTask->getId());
279            }
280        }
281    }
282}
283
284backfill_status_t BackfillManager::backfill() {
285    std::unique_lock<std::mutex> lh(lock);
286
287    // If no backfills remaining in any of the queues then we can
288    // stop the background task and finish.
289    if (initializingBackfills.empty() && activeBackfills.empty() &&
290        snoozingBackfills.empty() && pendingBackfills.empty()) {
291        managerTask.reset();
292        return backfill_finished;
293    }
294
295    if (kvBucket.isMemoryUsageTooHigh()) {
296        EP_LOG_INFO(
297                "DCP backfilling task temporarily suspended "
298                "because the current memory usage is too high");
299        return backfill_snooze;
300    }
301
302    movePendingToInitializing();
303    moveSnoozingToActiveQueue();
304
305    if (buffer.full) {
306        // If the buffer is full check to make sure we don't have any backfills
307        // that no longer have active streams and remove them. This prevents an
308        // issue where we have dead backfills taking up buffer space.
309        std::list<UniqueDCPBackfillPtr> toDelete;
310        for (auto a_itr = activeBackfills.begin();
311             a_itr != activeBackfills.end();) {
312            if ((*a_itr)->isStreamDead()) {
313                (*a_itr)->cancel();
314                toDelete.push_back(std::move(*a_itr));
315                a_itr = activeBackfills.erase(a_itr);
316                backfillTracker.decrNumActiveSnoozingBackfills();
317            } else {
318                ++a_itr;
319            }
320        }
321
322        lh.unlock();
323        bool reschedule = !toDelete.empty();
324        while (!toDelete.empty()) {
325            UniqueDCPBackfillPtr backfill = std::move(toDelete.front());
326            toDelete.pop_front();
327        }
328        return reschedule ? backfill_success : backfill_snooze;
329    }
330
331    UniqueDCPBackfillPtr backfill;
332    Source source;
333    std::tie(backfill, source) = dequeueNextBackfill(lh);
334
335    // If no backfills ready to run then snooze
336    if (!backfill) {
337        return backfill_snooze;
338    }
339
340    lh.unlock();
341    backfill_status_t status = backfill->run();
342    lh.lock();
343
344    scanBuffer.bytesRead = 0;
345    scanBuffer.itemsRead = 0;
346
347    switch (status) {
348        case backfill_success:
349            switch (scheduleOrder) {
350            case ScheduleOrder::RoundRobin:
351                activeBackfills.push_back(std::move(backfill));
352                break;
353            case ScheduleOrder::Sequential:
354                switch (source) {
355                case Source::Active:
356                    // If the source is active then this is the "current"
357                    // backfill we are working our way through, we want
358                    // to put it back on the front of the active queue to
359                    // run next time.
360                    activeBackfills.push_front(std::move(backfill));
361                    break;
362                case Source::Initializing:
363                    // New - this was only run to initialise it; it should
364                    // now go to the back of the active queue so the
365                    // "current" Backfill can resume to completion.
366                    // round-robin and sequential:
367                    activeBackfills.push_back(std::move(backfill));
368                    break;
369                }
370                break;
371            }
372            break;
373        case backfill_finished:
374            lh.unlock();
375            backfillTracker.decrNumActiveSnoozingBackfills();
376            break;
377        case backfill_snooze: {
378            snoozingBackfills.emplace_back(ep_current_time(),
379                                           std::move(backfill));
380            break;
381        }
382    }
383
384    return backfill_success;
385}
386
387void BackfillManager::movePendingToInitializing() {
388    while (!pendingBackfills.empty() &&
389           backfillTracker.canAddBackfillToActiveQ()) {
390        initializingBackfills.splice(initializingBackfills.end(),
391                                     pendingBackfills,
392                                     pendingBackfills.begin());
393    }
394}
395
396void BackfillManager::moveSnoozingToActiveQueue() {
397    while (!snoozingBackfills.empty()) {
398        std::pair<rel_time_t, UniqueDCPBackfillPtr> snoozer =
399                std::move(snoozingBackfills.front());
400        snoozingBackfills.pop_front();
401        // If snoozing task is found to be sleeping for greater than
402        // allowed snoozetime, push into active queue
403        if (snoozer.first + sleepTime <= ep_current_time()) {
404            activeBackfills.push_back(std::move(snoozer.second));
405        } else {
406            // Push back the popped snoozing backfill
407            snoozingBackfills.push_back(std::move(snoozer));
408            break;
409        }
410    }
411}
412
413std::pair<UniqueDCPBackfillPtr, BackfillManager::Source>
414BackfillManager::dequeueNextBackfill(std::unique_lock<std::mutex>&) {
415    // Dequeue from initializingBackfills if non-empty, else activeBackfills.
416    auto& queue = initializingBackfills.empty() ? activeBackfills
417                                                : initializingBackfills;
418    auto source = initializingBackfills.empty() ? Source::Active
419                                                : Source::Initializing;
420    if (!queue.empty()) {
421        auto next = std::move(queue.front());
422        queue.pop_front();
423        return {std::move(next), source};
424    }
425    return {};
426}
427
428void BackfillManager::wakeUpTask() {
429    LockHolder lh(lock);
430    if (managerTask) {
431        ExecutorPool::get()->wake(managerTask->getId());
432    }
433}
434std::string BackfillManager::to_string(BackfillManager::ScheduleOrder order) {
435    switch (order) {
436    case BackfillManager::ScheduleOrder::RoundRobin:
437        return "round-robin";
438    case BackfillManager::ScheduleOrder::Sequential:
439        return "sequential";
440    }
441    folly::assume_unreachable();
442}
443