1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2017 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #include "config.h"
19 
20 #include "dcp/backfill_memory.h"
21 #include "dcp/stream.h"
22 #include "ep_engine.h"
23 #include "ep_time.h"
24 #include "ephemeral_vb.h"
25 #include "seqlist.h"
26 
27 #include <phosphor/phosphor.h>
28 
DCPBackfillMemory(EphemeralVBucketPtr evb, std::shared_ptr<ActiveStream> s, uint64_t startSeqno, uint64_t endSeqno)29 DCPBackfillMemory::DCPBackfillMemory(EphemeralVBucketPtr evb,
30                                      std::shared_ptr<ActiveStream> s,
31                                      uint64_t startSeqno,
32                                      uint64_t endSeqno)
33     : DCPBackfill(s, startSeqno, endSeqno), weakVb(evb) {
34 }
35 
run()36 backfill_status_t DCPBackfillMemory::run() {
37     auto evb = weakVb.lock();
38     auto stream = streamPtr.lock();
39     if (!evb && !stream) {
40         /* We don't have to close the stream here. Task doing vbucket state
41          change should handle stream closure */
42         LOG(EXTENSION_LOG_WARNING,
43             "DCPBackfillMemory::run(): "
44             "(vb:%d) running backfill ended prematurely as the associated %s "
45             "is deleted; start seqno:%" PRIi64 ", end seqno:%" PRIi64,
46             getVBucketId(),
47             evb ? "vbucket" : "stream",
48             startSeqno,
49             endSeqno);
50         return backfill_finished;
51     }
52 
53     /* Get vb state lock */
54     ReaderLockHolder rlh(evb->getStateLock());
55     if (evb->getState() == vbucket_state_dead) {
56         /* We don't have to close the stream here. Task doing vbucket state
57            change should handle stream closure */
58         LOG(EXTENSION_LOG_WARNING,
59             "DCPBackfillMemory::run(): "
60             "(vb:%d) running backfill ended prematurely with vb in dead state; "
61             "start seqno:%" PRIi64 ", end seqno:%" PRIi64,
62             getVBucketId(),
63             startSeqno,
64             endSeqno);
65         return backfill_finished;
66     }
67 
68     /* Get sequence of items (backfill) from memory */
69     ENGINE_ERROR_CODE status;
70     std::vector<UniqueItemPtr> items;
71     seqno_t adjustedEndSeqno;
72     std::tie(status, items, adjustedEndSeqno) =
73             evb->inMemoryBackfill(startSeqno, endSeqno);
74 
75     /* Handle any failures */
76     if (status != ENGINE_SUCCESS) {
77         LOG(EXTENSION_LOG_WARNING,
78             "DCPBackfillMemory::run(): "
79             "(vb:%d) running backfill failed with error %d ; "
80             "start seqno:%" PRIi64 ", end seqno:%" PRIi64
81             ". "
82             "Hence closing the stream",
83             getVBucketId(),
84             status,
85             startSeqno,
86             endSeqno);
87         /* Close the stream, DCP clients can retry */
88         stream->setDead(END_STREAM_BACKFILL_FAIL);
89         return backfill_finished;
90     }
91 
92     /* Put items onto readyQ of the DCP stream */
93     stream->incrBackfillRemaining(items.size());
94 
95     /* Mark disk snapshot */
96     stream->markDiskSnapshot(startSeqno, adjustedEndSeqno);
97 
98     /* Move every item to the stream */
99     for (auto& item : items) {
100         stream->backfillReceived(
101                 std::move(item), BACKFILL_FROM_MEMORY, /*force*/ true);
102     }
103 
104     /* Indicate completion to the stream */
105     stream->completeBackfill();
106 
107     return backfill_finished;
108 }
109 
DCPBackfillMemoryBuffered( EphemeralVBucketPtr evb, std::shared_ptr<ActiveStream> s, uint64_t startSeqno, uint64_t endSeqno)110 DCPBackfillMemoryBuffered::DCPBackfillMemoryBuffered(
111         EphemeralVBucketPtr evb,
112         std::shared_ptr<ActiveStream> s,
113         uint64_t startSeqno,
114         uint64_t endSeqno)
115     : DCPBackfill(s, startSeqno, endSeqno),
116       evb(evb),
117       state(BackfillState::Init),
118       rangeItr(nullptr),
119       vbid(evb->getId()) {
120     TRACE_ASYNC_START1(
121             "dcp/backfill", "DCPBackfillMemoryBuffered", this, "vbid", vbid);
122 }
123 
~DCPBackfillMemoryBuffered()124 DCPBackfillMemoryBuffered::~DCPBackfillMemoryBuffered() {
125     TRACE_ASYNC_END1(
126             "dcp/backfill", "DCPBackfillMemoryBuffered", this, "vbid", vbid);
127 }
128 
run()129 backfill_status_t DCPBackfillMemoryBuffered::run() {
130     ReaderLockHolder rlh(evb->getStateLock());
131     if (evb->getState() == vbucket_state_dead) {
132         /* We don't have to close the stream here. Task doing vbucket state
133            change should handle stream closure */
134         LOG(EXTENSION_LOG_WARNING,
135             "DCPBackfillMemoryBuffered::run(): "
136             "(vb:%d) running backfill ended prematurely with vb in dead state; "
137             "start seqno:%" PRIi64 ", end seqno:%" PRIi64,
138             getVBucketId(),
139             startSeqno,
140             endSeqno);
141         return backfill_finished;
142     }
143 
144     TRACE_EVENT2("dcp/backfill",
145                  "MemoryBuffered::run",
146                  "vbid",
147                  evb->getId(),
148                  "state",
149                  uint8_t(state));
150 
151     switch (state) {
152     case BackfillState::Init:
153         return create();
154     case BackfillState::Scanning:
155         return scan();
156     case BackfillState::Done:
157         return backfill_finished;
158     }
159 
160     throw std::logic_error("DCPBackfillDisk::run: Invalid backfill state " +
161                            backfillStateToString(state));
162 }
163 
cancel()164 void DCPBackfillMemoryBuffered::cancel() {
165     if (state != BackfillState::Done) {
166         complete(true);
167     }
168 }
169 
create()170 backfill_status_t DCPBackfillMemoryBuffered::create() {
171     TRACE_EVENT1(
172             "dcp/backfill", "MemoryBuffered::create", "vbid", evb->getId());
173 
174     auto stream = streamPtr.lock();
175     if (!stream) {
176         LOG(EXTENSION_LOG_WARNING,
177             "DCPBackfillMemoryBuffered::create(): "
178             "(vb:%d) backfill create ended prematurely as the associated "
179             "stream is deleted by the producer conn ",
180             getVBucketId());
181         transitionState(BackfillState::Done);
182         return backfill_finished;
183     }
184 
185     /* Create range read cursor */
186     try {
187         auto rangeItrOptional = evb->makeRangeIterator(true /*isBackfill*/);
188         if (rangeItrOptional) {
189             rangeItr = std::move(*rangeItrOptional);
190         } else {
191             stream->log(EXTENSION_LOG_INFO,
192                         "vb:%" PRIu16
193                         " Deferring backfill creation as another "
194                         "range iterator is already on the sequence list",
195                         getVBucketId());
196             return backfill_snooze;
197         }
198     } catch (const std::bad_alloc&) {
199         stream->log(EXTENSION_LOG_WARNING,
200                     "Alloc error when trying to create a range iterator"
201                     "on the sequence list for (vb %" PRIu16 ")",
202                     getVBucketId());
203         /* Try backfilling again later; here we snooze because system has
204            hit ENOMEM */
205         return backfill_snooze;
206     }
207 
208     /* Check startSeqno against the purge-seqno of the vb.
209      * If the startSeqno != 1 (a 0 to n request) then startSeqno must be
210      * greater than purgeSeqno. */
211     if (startSeqno != 1 && (startSeqno <= evb->getPurgeSeqno())) {
212         LOG(EXTENSION_LOG_WARNING,
213             "DCPBackfillMemoryBuffered::create(): "
214             "(vb:%" PRIu16
215             ") running backfill failed because the startSeqno:%" PRIu64
216             " is < purgeSeqno:%" PRIu64,
217             getVBucketId(),
218             startSeqno,
219             evb->getPurgeSeqno());
220         stream->setDead(END_STREAM_ROLLBACK);
221         return backfill_finished;
222     }
223 
224     /* Advance the cursor till start, mark snapshot and update backfill
225        remaining count */
226     while (rangeItr.curr() != rangeItr.end()) {
227         if (static_cast<uint64_t>((*rangeItr).getBySeqno()) >= startSeqno) {
228             /* Incr backfill remaining
229                [EPHE TODO]: This will be inaccurate if do not backfill till end
230                             of the iterator
231              */
232             stream->incrBackfillRemaining(rangeItr.count());
233 
234             /* Determine the endSeqno of the current snapshot.
235                We want to send till requested endSeqno, but if that cannot
236                constitute a snapshot then we need to send till the point
237                which can be called as snapshot end */
238             endSeqno = std::max(
239                     endSeqno,
240                     static_cast<uint64_t>(rangeItr.getEarlySnapShotEnd()));
241 
242             /* We want to send items only till the point it is necessary to do
243                so */
244             endSeqno =
245                     std::min(endSeqno, static_cast<uint64_t>(rangeItr.back()));
246 
247             /* Mark disk snapshot */
248             stream->markDiskSnapshot(startSeqno, endSeqno);
249 
250             /* Change the backfill state */
251             transitionState(BackfillState::Scanning);
252 
253             /* Jump to scan here itself */
254             return scan();
255         }
256         ++rangeItr;
257     }
258 
259     /* Backfill is not needed as startSeqno > rangeItr end seqno */
260     complete(false);
261     return backfill_success;
262 }
263 
scan()264 backfill_status_t DCPBackfillMemoryBuffered::scan() {
265     TRACE_EVENT2("dcp/backfill",
266                  "MemoryBuffered::scan",
267                  "currSeqno",
268                  rangeItr.curr(),
269                  "endSeqno",
270                  endSeqno);
271 
272     auto stream = streamPtr.lock();
273     if (!stream) {
274         LOG(EXTENSION_LOG_WARNING,
275             "DCPBackfillMemoryBuffered::scan(): "
276             "(vb:%d) backfill create ended prematurely as the associated "
277             "stream is deleted by the producer conn ",
278             getVBucketId());
279         transitionState(BackfillState::Done);
280         return backfill_finished;
281     }
282 
283     if (!(stream->isActive())) {
284         /* Stop prematurely if the stream state changes */
285         complete(true);
286         return backfill_success;
287     }
288 
289     /* Read items */
290     UniqueItemPtr item;
291     while (static_cast<uint64_t>(rangeItr.curr()) <= endSeqno) {
292         try {
293             // MB-27199: toItem will read the StoredValue members, which are
294             // mutated with the HashBucketLock, so get the correct bucket lock
295             // before calling StoredValue::toItem
296             auto hbl = evb->ht.getLockedBucket((*rangeItr).getKey());
297             item = (*rangeItr).toItem(false, getVBucketId());
298             // A deleted ephemeral item stores the delete time under a delete
299             // time field, this must be copied to the expiry time so that DCP
300             // can transmit the original time of deletion
301             if (item->isDeleted()) {
302                 item->setExpTime(ep_abs_time((*rangeItr).getDeletedTime()));
303             }
304         } catch (const std::bad_alloc&) {
305             stream->log(EXTENSION_LOG_WARNING,
306                         "Alloc error when trying to create an "
307                         "item copy from hash table. Item seqno:%" PRIi64
308                         ", vb:%" PRIu16,
309                         (*rangeItr).getBySeqno(),
310                         getVBucketId());
311             /* Try backfilling again later; here we snooze because system has
312                hit ENOMEM */
313             return backfill_snooze;
314         }
315 
316         int64_t seqnoDbg = item->getBySeqno();
317         if (!stream->backfillReceived(
318                     std::move(item), BACKFILL_FROM_MEMORY, /*force*/ false)) {
319             /* Try backfill again later; here we do not snooze because we
320                want to check if other backfills can be run by the
321                backfillMgr */
322             TRACE_INSTANT("dcp/backfill", "ScanDefer", "seqno", seqnoDbg);
323             stream->log(EXTENSION_LOG_INFO,
324                         "vb:%" PRIu16 " Deferring backfill at seqno:%" PRIi64
325                         "as scan buffer or backfill buffer is full",
326                         getVBucketId(),
327                         seqnoDbg);
328             return backfill_success;
329         }
330         ++rangeItr;
331     }
332 
333     /* Backfill has ran to completion */
334     complete(false);
335 
336     return backfill_success;
337 }
338 
complete(bool cancelled)339 void DCPBackfillMemoryBuffered::complete(bool cancelled) {
340     TRACE_EVENT1(
341             "dcp/backfill", "MemoryBuffered::complete", "cancelled", cancelled);
342 
343     auto stream = streamPtr.lock();
344     if (!stream) {
345         LOG(EXTENSION_LOG_WARNING,
346             "DCPBackfillMemoryBuffered::complete(): "
347             "(vb:%d) backfill create ended prematurely as the associated "
348             "stream is deleted by the producer conn; %s",
349             getVBucketId(),
350             cancelled ? "cancelled" : "finished");
351         transitionState(BackfillState::Done);
352         return;
353     }
354 
355     /* [EPHE TODO]: invalidate cursor sooner before it gets deleted */
356 
357     stream->completeBackfill();
358 
359     EXTENSION_LOG_LEVEL severity =
360             cancelled ? EXTENSION_LOG_NOTICE : EXTENSION_LOG_INFO;
361     stream->log(severity,
362                 "(vb %d) Backfill task (%" PRIu64 " to %" PRIu64 ") %s",
363                 getVBucketId(),
364                 startSeqno,
365                 endSeqno,
366                 cancelled ? "cancelled" : "finished");
367 
368     transitionState(BackfillState::Done);
369 }
370 
transitionState(BackfillState newState)371 void DCPBackfillMemoryBuffered::transitionState(BackfillState newState) {
372     if (state == newState) {
373         return;
374     }
375 
376     bool validTransition = false;
377     switch (newState) {
378     case BackfillState::Init:
379         /* Not valid to transition back to 'init' */
380         break;
381     case BackfillState::Scanning:
382         if (state == BackfillState::Init) {
383             validTransition = true;
384         }
385         break;
386     case BackfillState::Done:
387         if (state == BackfillState::Init || state == BackfillState::Scanning) {
388             validTransition = true;
389         }
390         break;
391     }
392 
393     if (!validTransition) {
394         throw std::invalid_argument(
395                 "DCPBackfillMemoryBuffered::transitionState:"
396                 " newState (which is " +
397                 backfillStateToString(newState) +
398                 ") is not valid for current state (which is " +
399                 backfillStateToString(state) + ")");
400     }
401 
402     state = newState;
403 }
404 
backfillStateToString( BackfillState state)405 std::string DCPBackfillMemoryBuffered::backfillStateToString(
406         BackfillState state) {
407     switch (state) {
408     case BackfillState::Init:
409         return "initalizing";
410     case BackfillState::Scanning:
411         return "scanning";
412     case BackfillState::Done:
413         return "done";
414     }
415     return "Invalid state"; // dummy to avert certain compiler warnings
416 }
417