1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3 * Copyright 2017 Couchbase, Inc
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 #include "config.h"
19
20 #include "dcp/backfill_memory.h"
21 #include "dcp/stream.h"
22 #include "ep_engine.h"
23 #include "ep_time.h"
24 #include "ephemeral_vb.h"
25 #include "seqlist.h"
26
27 #include <phosphor/phosphor.h>
28
DCPBackfillMemory(EphemeralVBucketPtr evb, std::shared_ptr<ActiveStream> s, uint64_t startSeqno, uint64_t endSeqno)29 DCPBackfillMemory::DCPBackfillMemory(EphemeralVBucketPtr evb,
30 std::shared_ptr<ActiveStream> s,
31 uint64_t startSeqno,
32 uint64_t endSeqno)
33 : DCPBackfill(s, startSeqno, endSeqno), weakVb(evb) {
34 }
35
run()36 backfill_status_t DCPBackfillMemory::run() {
37 auto evb = weakVb.lock();
38 auto stream = streamPtr.lock();
39 if (!evb && !stream) {
40 /* We don't have to close the stream here. Task doing vbucket state
41 change should handle stream closure */
42 LOG(EXTENSION_LOG_WARNING,
43 "DCPBackfillMemory::run(): "
44 "(vb:%d) running backfill ended prematurely as the associated %s "
45 "is deleted; start seqno:%" PRIi64 ", end seqno:%" PRIi64,
46 getVBucketId(),
47 evb ? "vbucket" : "stream",
48 startSeqno,
49 endSeqno);
50 return backfill_finished;
51 }
52
53 /* Get vb state lock */
54 ReaderLockHolder rlh(evb->getStateLock());
55 if (evb->getState() == vbucket_state_dead) {
56 /* We don't have to close the stream here. Task doing vbucket state
57 change should handle stream closure */
58 LOG(EXTENSION_LOG_WARNING,
59 "DCPBackfillMemory::run(): "
60 "(vb:%d) running backfill ended prematurely with vb in dead state; "
61 "start seqno:%" PRIi64 ", end seqno:%" PRIi64,
62 getVBucketId(),
63 startSeqno,
64 endSeqno);
65 return backfill_finished;
66 }
67
68 /* Get sequence of items (backfill) from memory */
69 ENGINE_ERROR_CODE status;
70 std::vector<UniqueItemPtr> items;
71 seqno_t adjustedEndSeqno;
72 std::tie(status, items, adjustedEndSeqno) =
73 evb->inMemoryBackfill(startSeqno, endSeqno);
74
75 /* Handle any failures */
76 if (status != ENGINE_SUCCESS) {
77 LOG(EXTENSION_LOG_WARNING,
78 "DCPBackfillMemory::run(): "
79 "(vb:%d) running backfill failed with error %d ; "
80 "start seqno:%" PRIi64 ", end seqno:%" PRIi64
81 ". "
82 "Hence closing the stream",
83 getVBucketId(),
84 status,
85 startSeqno,
86 endSeqno);
87 /* Close the stream, DCP clients can retry */
88 stream->setDead(END_STREAM_BACKFILL_FAIL);
89 return backfill_finished;
90 }
91
92 /* Put items onto readyQ of the DCP stream */
93 stream->incrBackfillRemaining(items.size());
94
95 /* Mark disk snapshot */
96 stream->markDiskSnapshot(startSeqno, adjustedEndSeqno);
97
98 /* Move every item to the stream */
99 for (auto& item : items) {
100 stream->backfillReceived(
101 std::move(item), BACKFILL_FROM_MEMORY, /*force*/ true);
102 }
103
104 /* Indicate completion to the stream */
105 stream->completeBackfill();
106
107 return backfill_finished;
108 }
109
DCPBackfillMemoryBuffered( EphemeralVBucketPtr evb, std::shared_ptr<ActiveStream> s, uint64_t startSeqno, uint64_t endSeqno)110 DCPBackfillMemoryBuffered::DCPBackfillMemoryBuffered(
111 EphemeralVBucketPtr evb,
112 std::shared_ptr<ActiveStream> s,
113 uint64_t startSeqno,
114 uint64_t endSeqno)
115 : DCPBackfill(s, startSeqno, endSeqno),
116 evb(evb),
117 state(BackfillState::Init),
118 rangeItr(nullptr),
119 vbid(evb->getId()) {
120 TRACE_ASYNC_START1(
121 "dcp/backfill", "DCPBackfillMemoryBuffered", this, "vbid", vbid);
122 }
123
~DCPBackfillMemoryBuffered()124 DCPBackfillMemoryBuffered::~DCPBackfillMemoryBuffered() {
125 TRACE_ASYNC_END1(
126 "dcp/backfill", "DCPBackfillMemoryBuffered", this, "vbid", vbid);
127 }
128
run()129 backfill_status_t DCPBackfillMemoryBuffered::run() {
130 ReaderLockHolder rlh(evb->getStateLock());
131 if (evb->getState() == vbucket_state_dead) {
132 /* We don't have to close the stream here. Task doing vbucket state
133 change should handle stream closure */
134 LOG(EXTENSION_LOG_WARNING,
135 "DCPBackfillMemoryBuffered::run(): "
136 "(vb:%d) running backfill ended prematurely with vb in dead state; "
137 "start seqno:%" PRIi64 ", end seqno:%" PRIi64,
138 getVBucketId(),
139 startSeqno,
140 endSeqno);
141 return backfill_finished;
142 }
143
144 TRACE_EVENT2("dcp/backfill",
145 "MemoryBuffered::run",
146 "vbid",
147 evb->getId(),
148 "state",
149 uint8_t(state));
150
151 switch (state) {
152 case BackfillState::Init:
153 return create();
154 case BackfillState::Scanning:
155 return scan();
156 case BackfillState::Done:
157 return backfill_finished;
158 }
159
160 throw std::logic_error("DCPBackfillDisk::run: Invalid backfill state " +
161 backfillStateToString(state));
162 }
163
cancel()164 void DCPBackfillMemoryBuffered::cancel() {
165 if (state != BackfillState::Done) {
166 complete(true);
167 }
168 }
169
create()170 backfill_status_t DCPBackfillMemoryBuffered::create() {
171 TRACE_EVENT1(
172 "dcp/backfill", "MemoryBuffered::create", "vbid", evb->getId());
173
174 auto stream = streamPtr.lock();
175 if (!stream) {
176 LOG(EXTENSION_LOG_WARNING,
177 "DCPBackfillMemoryBuffered::create(): "
178 "(vb:%d) backfill create ended prematurely as the associated "
179 "stream is deleted by the producer conn ",
180 getVBucketId());
181 transitionState(BackfillState::Done);
182 return backfill_finished;
183 }
184
185 /* Create range read cursor */
186 try {
187 auto rangeItrOptional = evb->makeRangeIterator(true /*isBackfill*/);
188 if (rangeItrOptional) {
189 rangeItr = std::move(*rangeItrOptional);
190 } else {
191 stream->log(EXTENSION_LOG_INFO,
192 "vb:%" PRIu16
193 " Deferring backfill creation as another "
194 "range iterator is already on the sequence list",
195 getVBucketId());
196 return backfill_snooze;
197 }
198 } catch (const std::bad_alloc&) {
199 stream->log(EXTENSION_LOG_WARNING,
200 "Alloc error when trying to create a range iterator"
201 "on the sequence list for (vb %" PRIu16 ")",
202 getVBucketId());
203 /* Try backfilling again later; here we snooze because system has
204 hit ENOMEM */
205 return backfill_snooze;
206 }
207
208 /* Check startSeqno against the purge-seqno of the vb.
209 * If the startSeqno != 1 (a 0 to n request) then startSeqno must be
210 * greater than purgeSeqno. */
211 if (startSeqno != 1 && (startSeqno <= evb->getPurgeSeqno())) {
212 LOG(EXTENSION_LOG_WARNING,
213 "DCPBackfillMemoryBuffered::create(): "
214 "(vb:%" PRIu16
215 ") running backfill failed because the startSeqno:%" PRIu64
216 " is < purgeSeqno:%" PRIu64,
217 getVBucketId(),
218 startSeqno,
219 evb->getPurgeSeqno());
220 stream->setDead(END_STREAM_ROLLBACK);
221 return backfill_finished;
222 }
223
224 /* Advance the cursor till start, mark snapshot and update backfill
225 remaining count */
226 while (rangeItr.curr() != rangeItr.end()) {
227 if (static_cast<uint64_t>((*rangeItr).getBySeqno()) >= startSeqno) {
228 /* Incr backfill remaining
229 [EPHE TODO]: This will be inaccurate if do not backfill till end
230 of the iterator
231 */
232 stream->incrBackfillRemaining(rangeItr.count());
233
234 /* Determine the endSeqno of the current snapshot.
235 We want to send till requested endSeqno, but if that cannot
236 constitute a snapshot then we need to send till the point
237 which can be called as snapshot end */
238 endSeqno = std::max(
239 endSeqno,
240 static_cast<uint64_t>(rangeItr.getEarlySnapShotEnd()));
241
242 /* We want to send items only till the point it is necessary to do
243 so */
244 endSeqno =
245 std::min(endSeqno, static_cast<uint64_t>(rangeItr.back()));
246
247 /* Mark disk snapshot */
248 stream->markDiskSnapshot(startSeqno, endSeqno);
249
250 /* Change the backfill state */
251 transitionState(BackfillState::Scanning);
252
253 /* Jump to scan here itself */
254 return scan();
255 }
256 ++rangeItr;
257 }
258
259 /* Backfill is not needed as startSeqno > rangeItr end seqno */
260 complete(false);
261 return backfill_success;
262 }
263
scan()264 backfill_status_t DCPBackfillMemoryBuffered::scan() {
265 TRACE_EVENT2("dcp/backfill",
266 "MemoryBuffered::scan",
267 "currSeqno",
268 rangeItr.curr(),
269 "endSeqno",
270 endSeqno);
271
272 auto stream = streamPtr.lock();
273 if (!stream) {
274 LOG(EXTENSION_LOG_WARNING,
275 "DCPBackfillMemoryBuffered::scan(): "
276 "(vb:%d) backfill create ended prematurely as the associated "
277 "stream is deleted by the producer conn ",
278 getVBucketId());
279 transitionState(BackfillState::Done);
280 return backfill_finished;
281 }
282
283 if (!(stream->isActive())) {
284 /* Stop prematurely if the stream state changes */
285 complete(true);
286 return backfill_success;
287 }
288
289 /* Read items */
290 UniqueItemPtr item;
291 while (static_cast<uint64_t>(rangeItr.curr()) <= endSeqno) {
292 try {
293 // MB-27199: toItem will read the StoredValue members, which are
294 // mutated with the HashBucketLock, so get the correct bucket lock
295 // before calling StoredValue::toItem
296 auto hbl = evb->ht.getLockedBucket((*rangeItr).getKey());
297 item = (*rangeItr).toItem(false, getVBucketId());
298 // A deleted ephemeral item stores the delete time under a delete
299 // time field, this must be copied to the expiry time so that DCP
300 // can transmit the original time of deletion
301 if (item->isDeleted()) {
302 item->setExpTime(ep_abs_time((*rangeItr).getDeletedTime()));
303 }
304 } catch (const std::bad_alloc&) {
305 stream->log(EXTENSION_LOG_WARNING,
306 "Alloc error when trying to create an "
307 "item copy from hash table. Item seqno:%" PRIi64
308 ", vb:%" PRIu16,
309 (*rangeItr).getBySeqno(),
310 getVBucketId());
311 /* Try backfilling again later; here we snooze because system has
312 hit ENOMEM */
313 return backfill_snooze;
314 }
315
316 int64_t seqnoDbg = item->getBySeqno();
317 if (!stream->backfillReceived(
318 std::move(item), BACKFILL_FROM_MEMORY, /*force*/ false)) {
319 /* Try backfill again later; here we do not snooze because we
320 want to check if other backfills can be run by the
321 backfillMgr */
322 TRACE_INSTANT("dcp/backfill", "ScanDefer", "seqno", seqnoDbg);
323 stream->log(EXTENSION_LOG_INFO,
324 "vb:%" PRIu16 " Deferring backfill at seqno:%" PRIi64
325 "as scan buffer or backfill buffer is full",
326 getVBucketId(),
327 seqnoDbg);
328 return backfill_success;
329 }
330 ++rangeItr;
331 }
332
333 /* Backfill has ran to completion */
334 complete(false);
335
336 return backfill_success;
337 }
338
complete(bool cancelled)339 void DCPBackfillMemoryBuffered::complete(bool cancelled) {
340 TRACE_EVENT1(
341 "dcp/backfill", "MemoryBuffered::complete", "cancelled", cancelled);
342
343 auto stream = streamPtr.lock();
344 if (!stream) {
345 LOG(EXTENSION_LOG_WARNING,
346 "DCPBackfillMemoryBuffered::complete(): "
347 "(vb:%d) backfill create ended prematurely as the associated "
348 "stream is deleted by the producer conn; %s",
349 getVBucketId(),
350 cancelled ? "cancelled" : "finished");
351 transitionState(BackfillState::Done);
352 return;
353 }
354
355 /* [EPHE TODO]: invalidate cursor sooner before it gets deleted */
356
357 stream->completeBackfill();
358
359 EXTENSION_LOG_LEVEL severity =
360 cancelled ? EXTENSION_LOG_NOTICE : EXTENSION_LOG_INFO;
361 stream->log(severity,
362 "(vb %d) Backfill task (%" PRIu64 " to %" PRIu64 ") %s",
363 getVBucketId(),
364 startSeqno,
365 endSeqno,
366 cancelled ? "cancelled" : "finished");
367
368 transitionState(BackfillState::Done);
369 }
370
transitionState(BackfillState newState)371 void DCPBackfillMemoryBuffered::transitionState(BackfillState newState) {
372 if (state == newState) {
373 return;
374 }
375
376 bool validTransition = false;
377 switch (newState) {
378 case BackfillState::Init:
379 /* Not valid to transition back to 'init' */
380 break;
381 case BackfillState::Scanning:
382 if (state == BackfillState::Init) {
383 validTransition = true;
384 }
385 break;
386 case BackfillState::Done:
387 if (state == BackfillState::Init || state == BackfillState::Scanning) {
388 validTransition = true;
389 }
390 break;
391 }
392
393 if (!validTransition) {
394 throw std::invalid_argument(
395 "DCPBackfillMemoryBuffered::transitionState:"
396 " newState (which is " +
397 backfillStateToString(newState) +
398 ") is not valid for current state (which is " +
399 backfillStateToString(state) + ")");
400 }
401
402 state = newState;
403 }
404
backfillStateToString( BackfillState state)405 std::string DCPBackfillMemoryBuffered::backfillStateToString(
406 BackfillState state) {
407 switch (state) {
408 case BackfillState::Init:
409 return "initalizing";
410 case BackfillState::Scanning:
411 return "scanning";
412 case BackfillState::Done:
413 return "done";
414 }
415 return "Invalid state"; // dummy to avert certain compiler warnings
416 }
417