1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2018 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "item_compressor.h"
19 
20 #include <phosphor/phosphor.h>
21 
22 #include "ep_engine.h"
23 #include "executorpool.h"
24 #include "item_compressor_visitor.h"
25 #include "kv_bucket.h"
26 #include "stored-value.h"
27 
ItemCompressorTask(EventuallyPersistentEngine* e, EPStats& stats_)28 ItemCompressorTask::ItemCompressorTask(EventuallyPersistentEngine* e,
29                                        EPStats& stats_)
30     : GlobalTask(e, TaskId::ItemCompressorTask, 0, false),
31       stats(stats_),
32       epstore_position(engine->getKVBucket()->startPosition()) {
33 }
34 
run(void)35 bool ItemCompressorTask::run(void) {
36     TRACE_EVENT0("ep-engine/task", "ItemCompressorTask");
37     if (engine->getCompressionMode() == BucketCompressionMode::Active) {
38         // Get our pause/resume visitor. If we didn't finish the previous pass,
39         // then resume from where we last were, otherwise create a new visitor
40         // starting from the beginning.
41         if (!prAdapter) {
42             prAdapter = std::make_unique<PauseResumeVBAdapter>(
43                     std::make_unique<ItemCompressorVisitor>());
44             epstore_position = engine->getKVBucket()->startPosition();
45         }
46 
47         // Print start status.
48         std::stringstream ss;
49         ss << getDescription() << " for bucket '" << engine->getName() << "'";
50         if (epstore_position == engine->getKVBucket()->startPosition()) {
51             ss << " starting. ";
52         } else {
53             ss << " resuming from " << epstore_position << ", ";
54             ss << prAdapter->getHashtablePosition() << ".";
55         }
56         ss << " Using chunk_duration=" << getChunkDuration().count() << " ms."
57            << " mem_used=" << stats.getEstimatedTotalMemoryUsed();
58         LOG(EXTENSION_LOG_INFO, "%s", ss.str().c_str());
59 
60         // Prepare the underlying visitor.
61         auto& visitor = getItemCompressorVisitor();
62         const auto start = ProcessClock::now();
63         const auto deadline = start + getChunkDuration();
64         visitor.setDeadline(deadline);
65         visitor.clearStats();
66         visitor.setCompressionMode(engine->getCompressionMode());
67         visitor.setMinCompressionRatio(engine->getMinCompressionRatio());
68 
69         // Do it - set off the visitor.
70         epstore_position = engine->getKVBucket()->pauseResumeVisit(
71                 *prAdapter, epstore_position);
72         const auto end = ProcessClock::now();
73 
74         // Update stats
75         stats.compressorNumCompressed.fetch_add(visitor.getCompressedCount());
76         stats.compressorNumVisited.fetch_add(visitor.getVisitedCount());
77 
78         // Check if the visitor completed a full pass.
79         bool completed =
80                 (epstore_position == engine->getKVBucket()->endPosition());
81 
82         // Print status.
83         ss.str("");
84         ss << getDescription() << " for bucket '" << engine->getName() << "'";
85         if (completed) {
86             ss << " finished.";
87         } else {
88             ss << " paused at position " << epstore_position << ".";
89         }
90         std::chrono::microseconds duration =
91                 std::chrono::duration_cast<std::chrono::microseconds>(end -
92                                                                       start);
93         ss << " Took " << duration.count() << " us."
94            << " compressed " << visitor.getCompressedCount() << "/"
95            << visitor.getVisitedCount() << " visited documents."
96            << " mem_used=" << stats.getEstimatedTotalMemoryUsed()
97            << ".Sleeping for " << getSleepTime() << " seconds.";
98         LOG(EXTENSION_LOG_INFO, "%s", ss.str().c_str());
99 
100         // Delete(reset) visitor if it finished.
101         if (completed) {
102             prAdapter.reset();
103         }
104     }
105 
106     snooze(getSleepTime());
107     if (engine->getEpStats().isShutdown) {
108         return false;
109     }
110     return true;
111 }
112 
stop(void)113 void ItemCompressorTask::stop(void) {
114     if (uid) {
115         ExecutorPool::get()->cancel(uid);
116     }
117 }
118 
getDescription()119 std::string ItemCompressorTask::getDescription() {
120     return "Item Compressor";
121 }
122 
maxExpectedDuration()123 std::chrono::microseconds ItemCompressorTask::maxExpectedDuration() {
124     // The item compressor processes items in chunks, with each chunk
125     // constrained by a ChunkDuration runtime, so we expect to only take
126     // that long. However, the ProgressTracker used estimates the time
127     // remaining, so apply some headroom to that figure so we don't get
128     // inundated with spurious "slow tasks" which only just exceed the limit.
129     return getChunkDuration() * 10;
130 }
131 
getSleepTime() const132 double ItemCompressorTask::getSleepTime() const {
133     return (engine->getConfiguration().getItemCompressorInterval() * 0.001);
134 }
135 
getChunkDuration() const136 std::chrono::milliseconds ItemCompressorTask::getChunkDuration() const {
137     return std::chrono::milliseconds(
138             engine->getConfiguration().getItemCompressorChunkDuration());
139 }
140 
getItemCompressorVisitor()141 ItemCompressorVisitor& ItemCompressorTask::getItemCompressorVisitor() {
142     return dynamic_cast<ItemCompressorVisitor&>(prAdapter->getHTVisitor());
143 }
144