1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2018-Present Couchbase, Inc.
4  *
5  *   Use of this software is governed by the Business Source License included
6  *   in the file licenses/BSL-Couchbase.txt.  As of the Change Date specified
7  *   in that file, in accordance with the Business Source License, use of this
8  *   software will be governed by the Apache License, Version 2.0, included in
9  *   the file licenses/APL2.txt.
10  */
11 
12 #include "passive_stream.h"
13 
14 #include "bucket_logger.h"
15 #include "checkpoint_manager.h"
16 #include "collections/vbucket_manifest_handles.h"
17 #include "dcp/consumer.h"
18 #include "dcp/response.h"
19 #include "durability/durability_monitor.h"
20 #include "ep_engine.h"
21 #include "failover-table.h"
22 #include "kv_bucket.h"
23 #include "replicationthrottle.h"
24 #include "vbucket.h"
25 
26 #include <gsl/gsl-lite.hpp>
27 #include <nlohmann/json.hpp>
28 #include <statistics/cbstat_collector.h>
29 
30 #include <memory>
31 
32 const std::string passiveStreamLoggingPrefix =
33         "DCP (Consumer): **Deleted conn**";
34 
35 PassiveStream::PassiveStream(EventuallyPersistentEngine* e,
36                              std::shared_ptr<DcpConsumer> c,
37                              const std::string& name,
38                              uint32_t flags,
39                              uint32_t opaque,
40                              Vbid vb,
41                              uint64_t st_seqno,
42                              uint64_t en_seqno,
43                              uint64_t vb_uuid,
44                              uint64_t snap_start_seqno,
45                              uint64_t snap_end_seqno,
46                              uint64_t vb_high_seqno,
47                              const Collections::ManifestUid vb_manifest_uid)
48     : Stream(name,
49              flags,
50              opaque,
51              vb,
52              st_seqno,
53              en_seqno,
54              vb_uuid,
55              snap_start_seqno,
56              snap_end_seqno),
57       engine(e),
58       consumerPtr(c),
59       last_seqno(vb_high_seqno, {*this}),
60       cur_snapshot_start(0, {*this}),
61       cur_snapshot_end(0, {*this}),
62       cur_snapshot_type(Snapshot::None),
63       cur_snapshot_ack(false),
64       cur_snapshot_prepare(false),
65       vb_manifest_uid(vb_manifest_uid),
66       alwaysBufferOperations(c->shouldBufferOperations()) {
67     std::lock_guard<std::mutex> lh(streamMutex);
68     streamRequest_UNLOCKED(vb_uuid);
69     itemsReady.store(true);
70 }
71 
72 PassiveStream::~PassiveStream() {
73     uint32_t unackedBytes = clearBuffer_UNLOCKED();
74     if (state_ != StreamState::Dead) {
75         // Destructed a "live" stream, log it.
76         log(spdlog::level::level_enum::info,
77             "({}) Destructing stream."
78             " last_seqno is {}, unAckedBytes is {}.",
79             vb_,
80             last_seqno.load(),
81             unackedBytes);
82     }
83 }
84 
85 void PassiveStream::streamRequest(uint64_t vb_uuid) {
86     {
87         std::unique_lock<std::mutex> lh(streamMutex);
88         streamRequest_UNLOCKED(vb_uuid);
89     }
90     notifyStreamReady();
91 }
92 
93 void PassiveStream::streamRequest_UNLOCKED(uint64_t vb_uuid) {
94     auto stream_req_value = createStreamReqValue();
95 
96     /* the stream should send a don't care vb_uuid if start_seqno is 0 */
97     pushToReadyQ(std::make_unique<StreamRequest>(vb_,
98                                                  opaque_,
99                                                  flags_,
100                                                  start_seqno_,
101                                                  end_seqno_,
102                                                  start_seqno_ ? vb_uuid : 0,
103                                                  snap_start_seqno_,
104                                                  snap_end_seqno_,
105                                                  stream_req_value));
106 
107     const char* type = (flags_ & DCP_ADD_STREAM_FLAG_TAKEOVER)
108                                ? "takeover stream"
109                                : "stream";
110 
111     log(spdlog::level::level_enum::info,
112         "({}) Attempting to add {}: opaque_:{}, start_seqno_:{}, "
113         "end_seqno_:{}, vb_uuid:{}, snap_start_seqno_:{}, snap_end_seqno_:{}, "
114         "last_seqno:{}, stream_req_value:{}",
115         vb_,
116         type,
117         opaque_,
118         start_seqno_,
119         end_seqno_,
120         vb_uuid,
121         snap_start_seqno_,
122         snap_end_seqno_,
123         last_seqno.load(),
124         stream_req_value.empty() ? "none" : stream_req_value);
125 }
126 
127 uint32_t PassiveStream::setDead(cb::mcbp::DcpStreamEndStatus status) {
128     /* Hold buffer lock so that we clear out all items before we set the stream
129        to dead state. We do not want to add any new message to the buffer or
130        process any items in the buffer once we set the stream state to dead. */
131     std::unique_lock<std::mutex> lg(buffer.bufMutex);
132     uint32_t unackedBytes = clearBuffer_UNLOCKED();
133     bool killed = false;
134 
135     std::lock_guard<std::mutex> slh(streamMutex);
136     if (transitionState(StreamState::Dead)) {
137         killed = true;
138     }
139 
140     if (killed) {
141         auto severity = spdlog::level::level_enum::info;
142         if (cb::mcbp::DcpStreamEndStatus::Disconnected == status) {
143             severity = spdlog::level::level_enum::warn;
144         }
145         log(severity,
146             "({}) Setting stream to dead state, last_seqno is {}, "
147             "unAckedBytes is {}, status is {}",
148             vb_,
149             last_seqno.load(),
150             unackedBytes,
151             cb::mcbp::to_string(status));
152     }
153     return unackedBytes;
154 }
155 
156 std::string PassiveStream::getStreamTypeName() const {
157     return "Passive";
158 }
159 
160 std::string PassiveStream::getStateName() const {
161     return to_string(state_);
162 }
163 
164 bool PassiveStream::isActive() const {
165     return state_ != StreamState::Dead;
166 }
167 
168 bool PassiveStream::isPending() const {
169     return state_ == StreamState::Pending;
170 }
171 
172 void PassiveStream::acceptStream(cb::mcbp::Status status, uint32_t add_opaque) {
173     VBucketPtr vb = engine->getVBucket(vb_);
174     if (!vb) {
175         log(spdlog::level::level_enum::warn,
176             "({}) PassiveStream::acceptStream(): status:{} - Unable to find "
177             "VBucket - cannot accept Stream.",
178             vb_,
179             status);
180         return;
181     }
182 
183     auto consumer = consumerPtr.lock();
184     if (!consumer) {
185         log(spdlog::level::level_enum::warn,
186             "({}) PassiveStream::acceptStream(): status:{} - Unable to lock "
187             "Consumer - cannot accept Stream.",
188             vb_,
189             status);
190         return;
191     }
192 
193     // We use the cur_snapshot_prepare member to determine if we should
194     // notify the PDM of any Memory snapshots. It is set when we see a
195     // prepare in any snapshot. Consider the following snapshot:
196     //
197     // [1:Prepare(A), 2:Mutation(B)] Type = Memory
198     //
199     // If we have only received and persisted the following sequence of events
200     // but then restart, we would fail to notify the PDM of the complete
201     // snapshot:
202     //
203     // 1) SnapshotMarker (1-2) Type = Memory
204     // 2) Prepare (1)                        <- Persisted to disk
205     //
206     // To solve this, we can fix the cur_snapshot_prepare state on
207     // PassiveStream acceptance. The PDM already avoids acking back the same
208     // seqno, so notifying an extra snapshot shouldn't matter, and even if we
209     // did ack back the same seqno, the ADM should already deal with weakly
210     // monotonic acks as we ack back the HPS on stream connection.
211     cur_snapshot_prepare = true;
212 
213     // SyncReplication: About to commence accepting data on this stream. Check
214     // if the associated consumer supports SyncReplication, so we can later
215     // correctly process Snapshot Markers.
216     supportsSyncReplication = consumer->isSyncReplicationEnabled();
217 
218     // For SyncReplication streams lookup the highPreparedSeqno to check if
219     // we need to re-ACK (after accepting the stream).
220     const int64_t highPreparedSeqno =
221             supportsSyncReplication ? vb->getHighPreparedSeqno() : 0;
222 
223     std::unique_lock<std::mutex> lh(streamMutex);
224     if (isPending()) {
225         pushToReadyQ(std::make_unique<AddStreamResponse>(
226                 add_opaque, opaque_, status));
227         if (status == cb::mcbp::Status::Success) {
228             // Before we receive/process anything else, send a seqno ack if we
229             // are a stream for a pre-existing vBucket to ensure that the
230             // replica can commit any in-flight SyncWrites if no further
231             // SyncWrites are done and no disk snapshots processed by this
232             // replica.
233             if (highPreparedSeqno) {
234                 pushToReadyQ(std::make_unique<SeqnoAcknowledgement>(
235                         opaque_, vb_, highPreparedSeqno));
236             }
237             transitionState(StreamState::Reading);
238         } else {
239             transitionState(StreamState::Dead);
240         }
241         lh.unlock();
242         notifyStreamReady();
243     }
244 }
245 
246 void PassiveStream::reconnectStream(VBucketPtr& vb,
247                                     uint32_t new_opaque,
248                                     uint64_t start_seqno) {
249     /* the stream should send a don't care vb_uuid if start_seqno is 0 */
250     vb_uuid_ = start_seqno ? vb->failovers->getLatestEntry().vb_uuid : 0;
251 
252     snapshot_info_t info = vb->checkpointManager->getSnapshotInfo();
253     if (info.range.getEnd() == info.start) {
254         info.range.setStart(info.start);
255     }
256 
257     auto stream_req_value = createStreamReqValue();
258 
259     {
260         std::lock_guard<std::mutex> lh(streamMutex);
261 
262         snap_start_seqno_ = info.range.getStart();
263         start_seqno_ = info.start;
264         snap_end_seqno_ = info.range.getEnd();
265         last_seqno.reset(start_seqno);
266         // The start_seqno & cur_snapshot_end shouldn't be less than start_seqno
267         // to set it's starting val to start_seqno
268         cur_snapshot_start.reset(start_seqno);
269         cur_snapshot_end.reset(start_seqno);
270 
271         log(spdlog::level::level_enum::info,
272             "({}) Attempting to reconnect stream with opaque {}, start seq "
273             "no {}, end seq no {}, snap start seqno {}, snap end seqno {}, and "
274             "vb manifest uid {}",
275             vb_,
276             new_opaque,
277             start_seqno,
278             end_seqno_,
279             snap_start_seqno_,
280             snap_end_seqno_,
281             stream_req_value.empty() ? "none" : stream_req_value);
282 
283         pushToReadyQ(std::make_unique<StreamRequest>(vb_,
284                                                      new_opaque,
285                                                      flags_,
286                                                      start_seqno,
287                                                      end_seqno_,
288                                                      vb_uuid_,
289                                                      snap_start_seqno_,
290                                                      snap_end_seqno_,
291                                                      stream_req_value));
292     }
293     notifyStreamReady();
294 }
295 
296 cb::engine_errc PassiveStream::messageReceived(
297         std::unique_ptr<DcpResponse> dcpResponse, UpdateFlowControl& ufc) {
298     if (!dcpResponse) {
299         return cb::engine_errc::invalid_arguments;
300     }
301 
302     if (!isActive()) {
303         // If the Stream isn't active, *but* the object is still receiving
304         // messages from the DcpConsumer that means the stream is still
305         // registered in the streams map and hence we should ignore any
306         // messages (until STREAM_END is received and the stream is removed form
307         // the map).
308         return cb::engine_errc::success;
309     }
310 
311     auto seqno = dcpResponse->getBySeqno();
312     if (seqno) {
313         if (uint64_t(*seqno) <= last_seqno.load()) {
314             log(spdlog::level::level_enum::warn,
315                 "({}) Erroneous (out of sequence) message ({}) received, "
316                 "with opaque: {}, its seqno ({}) is not "
317                 "greater than last received seqno ({}); "
318                 "Dropping mutation!",
319                 vb_,
320                 dcpResponse->to_string(),
321                 opaque_,
322                 *seqno,
323                 last_seqno.load());
324             return cb::engine_errc::out_of_range;
325         }
326     } else if (dcpResponse->getEvent() == DcpResponse::Event::SnapshotMarker) {
327         auto s = static_cast<SnapshotMarker*>(dcpResponse.get());
328         uint64_t snapStart = s->getStartSeqno();
329         uint64_t snapEnd = s->getEndSeqno();
330         if (snapStart < last_seqno.load() && snapEnd <= last_seqno.load()) {
331             log(spdlog::level::level_enum::warn,
332                 "({}) Erroneous snapshot marker received, with "
333                 "opaque: {}, its start "
334                 "({}), and end ({}) are less than last "
335                 "received seqno ({}); Dropping marker!",
336                 vb_,
337                 opaque_,
338                 snapStart,
339                 snapEnd,
340                 last_seqno.load());
341             return cb::engine_errc::out_of_range;
342         }
343     }
344 
345     switch (engine->getReplicationThrottle().getStatus()) {
346     case ReplicationThrottle::Status::Disconnect:
347         log(spdlog::level::level_enum::warn,
348             "{} Disconnecting the connection as there is "
349             "no memory to complete replication",
350             vb_);
351         return cb::engine_errc::disconnect;
352     case ReplicationThrottle::Status::Process:
353         if (buffer.empty() && !alwaysBufferOperations) {
354             /* Process the response here itself rather than buffering it */
355             cb::engine_errc ret = cb::engine_errc::success;
356             switch (dcpResponse->getEvent()) {
357             case DcpResponse::Event::Mutation:
358                 ret = processMutation(static_cast<MutationConsumerMessage*>(
359                         dcpResponse.get()));
360                 break;
361             case DcpResponse::Event::Deletion:
362                 ret = processDeletion(static_cast<MutationConsumerMessage*>(
363                         dcpResponse.get()));
364                 break;
365             case DcpResponse::Event::Expiration:
366                 ret = processExpiration(static_cast<MutationConsumerMessage*>(
367                         dcpResponse.get()));
368                 break;
369             case DcpResponse::Event::Prepare:
370                 ret = processPrepare(static_cast<MutationConsumerMessage*>(
371                         dcpResponse.get()));
372                 break;
373             case DcpResponse::Event::Commit:
374                 ret = processCommit(
375                         static_cast<CommitSyncWriteConsumer&>(*dcpResponse));
376                 break;
377             case DcpResponse::Event::Abort:
378                 ret = processAbort(
379                         dynamic_cast<AbortSyncWriteConsumer&>(*dcpResponse));
380                 break;
381             case DcpResponse::Event::SnapshotMarker:
382                 processMarker(static_cast<SnapshotMarker*>(dcpResponse.get()));
383                 break;
384             case DcpResponse::Event::SetVbucket:
385                 processSetVBucketState(
386                         static_cast<SetVBucketState*>(dcpResponse.get()));
387                 break;
388             case DcpResponse::Event::StreamEnd: {
389                 std::lock_guard<std::mutex> lh(streamMutex);
390                 transitionState(StreamState::Dead);
391             } break;
392             case DcpResponse::Event::SystemEvent: {
393                 ret = processSystemEvent(
394                         *static_cast<SystemEventMessage*>(dcpResponse.get()));
395                 break;
396             }
397             case DcpResponse::Event::StreamReq:
398             case DcpResponse::Event::AddStream:
399             case DcpResponse::Event::SeqnoAcknowledgement:
400             case DcpResponse::Event::OSOSnapshot:
401             case DcpResponse::Event::SeqnoAdvanced:
402                 // These are invalid events for this path, they are handled by
403                 // the DcpConsumer class
404                 throw std::invalid_argument(
405                         "PassiveStream::messageReceived invalid event type:" +
406                         std::string(dcpResponse->to_string()));
407             }
408 
409             if (ret == cb::engine_errc::no_memory) {
410                 if (engine->getReplicationThrottle().doDisconnectOnNoMem()) {
411                     log(spdlog::level::level_enum::warn,
412                         "{} Disconnecting the connection as there is no "
413                         "memory to complete replication; process dcp "
414                         "event returned no memory",
415                         vb_);
416                     return cb::engine_errc::disconnect;
417                 }
418             }
419             if (ret == cb::engine_errc::success && seqno) {
420                 last_seqno.store(*seqno);
421             }
422             if (ret != cb::engine_errc::temporary_failure &&
423                 ret != cb::engine_errc::no_memory) {
424                 return ret;
425             }
426         }
427         break;
428     case ReplicationThrottle::Status::Pause:
429         /* Do nothing specific here, we buffer item for this case and
430            other cases below */
431         break;
432     }
433 
434     // Only buffer if the stream is not dead
435     if (isActive()) {
436         buffer.push({std::move(dcpResponse), ufc.release()});
437     }
438     return cb::engine_errc::temporary_failure;
439 }
440 
441 process_items_error_t PassiveStream::processBufferedMessages(
442         uint32_t& processed_bytes, size_t batchSize) {
443     std::unique_lock<std::mutex> lh(buffer.bufMutex);
444     uint32_t count = 0;
445     uint32_t total_bytes_processed = 0;
446     bool failed = false, noMem = false;
447 
448     while (count < batchSize && !buffer.messages.empty()) {
449         cb::engine_errc ret = cb::engine_errc::success;
450         /* If the stream is in dead state we should not process any remaining
451            items in the buffer, we should rather clear them */
452         if (!isActive()) {
453             total_bytes_processed += clearBuffer_UNLOCKED();
454             processed_bytes = total_bytes_processed;
455             return all_processed;
456         }
457 
458         // MB-31410: The front-end thread can process new incoming messages
459         // only /after/ all the buffered ones have been processed.
460         // So, here we get only a reference. We remove the message from the
461         // buffer later, only /after/ we have processed it.
462         // That is because the front-end thread checks if buffer.empty() for
463         // deciding if it's time to start again processing new incoming
464         // mutations. That happens in PassiveStream::messageReceived.
465         auto [response, message_bytes] = buffer.moveFromFront(lh);
466 
467         // Release bufMutex whilst we attempt to process the message
468         // a lock inversion exists with connManager if we hold this.
469         lh.unlock();
470 
471         // MB-31410: Only used for testing
472         processBufferedMessages_postFront_Hook();
473 
474         auto seqno = response->getBySeqno();
475 
476         switch (response->getEvent()) {
477         case DcpResponse::Event::Mutation:
478             ret = processMutation(
479                     static_cast<MutationConsumerMessage*>(response.get()));
480             break;
481         case DcpResponse::Event::Deletion:
482             ret = processDeletion(
483                     static_cast<MutationConsumerMessage*>(response.get()));
484             break;
485         case DcpResponse::Event::Expiration:
486             ret = processExpiration(
487                     static_cast<MutationConsumerMessage*>(response.get()));
488             break;
489         case DcpResponse::Event::Prepare:
490             ret = processPrepare(
491                     static_cast<MutationConsumerMessage*>(response.get()));
492             break;
493         case DcpResponse::Event::Commit:
494             ret = processCommit(
495                     static_cast<CommitSyncWriteConsumer&>(*response));
496             break;
497         case DcpResponse::Event::Abort:
498             ret = processAbort(
499                     dynamic_cast<AbortSyncWriteConsumer&>(*response));
500             break;
501         case DcpResponse::Event::SnapshotMarker:
502             processMarker(static_cast<SnapshotMarker*>(response.get()));
503             break;
504         case DcpResponse::Event::SetVbucket:
505             processSetVBucketState(
506                     static_cast<SetVBucketState*>(response.get()));
507             break;
508         case DcpResponse::Event::StreamEnd: {
509             std::lock_guard<std::mutex> slh(streamMutex);
510             transitionState(StreamState::Dead);
511         } break;
512         case DcpResponse::Event::SystemEvent: {
513             ret = processSystemEvent(
514                     *static_cast<SystemEventMessage*>(response.get()));
515             break;
516         }
517         case DcpResponse::Event::StreamReq:
518         case DcpResponse::Event::AddStream:
519         case DcpResponse::Event::SeqnoAcknowledgement:
520         case DcpResponse::Event::OSOSnapshot:
521         case DcpResponse::Event::SeqnoAdvanced:
522             // These are invalid events for this path, they are handled by the
523             // DcpConsumer class
524             throw std::invalid_argument(
525                     "PassiveStream::processBufferedMessages invalid event "
526                     "type:" +
527                     std::string(response->to_string()));
528         }
529 
530         if (ret == cb::engine_errc::temporary_failure ||
531             ret == cb::engine_errc::no_memory) {
532             failed = true;
533             if (ret == cb::engine_errc::no_memory) {
534                 noMem = true;
535             }
536         }
537 
538         // If we failed and the stream is not dead, just break the loop and
539         // return. We will try again with processing the message at the next
540         // run.
541         // Note:
542         //     1) no need to re-acquire bufMutex here
543         //     2) we have not removed the unique_ptr from the buffer yet, but
544         //        we must give the item back to the buffer queue
545         if (failed && isActive()) {
546             lh.lock();
547             // isActive should be false if the queue was emptied, but check
548             // anyway so we're more robust against any future code changes to
549             // isActive and closeStream
550             if (!buffer.messages.empty()) {
551                 buffer.moveToFront(lh, {std::move(response), message_bytes});
552             }
553             lh.unlock();
554             break;
555         }
556 
557         // At this point we have processed the message successfully,
558         // then we can remove it from the buffer.
559         // Note: we need to re-acquire bufMutex to update the buffer safely
560         lh.lock();
561         buffer.pop_front(lh);
562 
563         count++;
564         if (ret != cb::engine_errc::out_of_range) {
565             total_bytes_processed += message_bytes;
566         }
567         if (ret == cb::engine_errc::success && seqno) {
568             last_seqno.store(*seqno);
569         }
570     }
571 
572     processed_bytes = total_bytes_processed;
573 
574     if (failed) {
575         if (noMem && engine->getReplicationThrottle().doDisconnectOnNoMem()) {
576             log(spdlog::level::level_enum::warn,
577                 "{} Processor task indicating disconnection as "
578                 "there is no memory to complete replication; process dcp "
579                 "event returned no memory ",
580                 vb_);
581             return stop_processing;
582         }
583         return cannot_process;
584     }
585 
586     return all_processed;
587 }
588 
589 cb::engine_errc PassiveStream::processMessage(MutationConsumerMessage* message,
590                                               MessageType messageType) {
591     std::array<std::string, 4> taskToString{
592             {"mutation", "deletion", "expiration", "prepare"}};
593     VBucketPtr vb = engine->getVBucket(vb_);
594     if (!vb) {
595         return cb::engine_errc::not_my_vbucket;
596     }
597 
598     auto consumer = consumerPtr.lock();
599     if (!consumer) {
600         return cb::engine_errc::disconnect;
601     }
602 
603     if (uint64_t(*message->getBySeqno()) < cur_snapshot_start.load() ||
604         uint64_t(*message->getBySeqno()) > cur_snapshot_end.load()) {
605         log(spdlog::level::level_enum::warn,
606             "({}) Erroneous {} [sequence "
607             "number does not fall in the expected snapshot range : "
608             "{{snapshot_start ({}) <= seq_no ({}) <= "
609             "snapshot_end ({})]; Dropping the {}!",
610             vb_,
611             taskToString[messageType],
612             cur_snapshot_start.load(),
613             *message->getBySeqno(),
614             cur_snapshot_end.load(),
615             taskToString[messageType]);
616         return cb::engine_errc::out_of_range;
617     }
618 
619     switch (messageType) {
620     case MessageType::Mutation:
621         // Skip
622         break;
623     case MessageType::Deletion:
624     case MessageType::Expiration:
625         // The deleted value has a body, send it through the mutation path so we
626         // set the deleted item with a value
627         if (message->getItem()->getNBytes()) {
628             return processMessage(message, MessageType::Mutation);
629         }
630         break;
631     case MessageType::Prepare:
632         // No extra processing.
633         break;
634     }
635 
636     // MB-17517: Check for the incoming item's CAS validity. We /shouldn't/
637     // receive anything without a valid CAS, however given that versions without
638     // this check may send us "bad" CAS values, we should regenerate them (which
639     // is better than rejecting the data entirely).
640     if (!Item::isValidCas(message->getItem()->getCas())) {
641         log(spdlog::level::level_enum::warn,
642             "Invalid CAS ({:#x}) received for {} {{{}, seqno:{}}}. "
643             "Regenerating new CAS",
644             message->getItem()->getCas(),
645             taskToString[messageType],
646             vb_,
647             message->getItem()->getBySeqno());
648         message->getItem()->setCas();
649     }
650 
651     cb::engine_errc ret;
652     DeleteSource deleteSource = DeleteSource::Explicit;
653     bool switchComplete = false;
654     switch (messageType) {
655     case MessageType::Mutation:
656 
657         ret = engine->getKVBucket()->setWithMeta(*message->getItem(),
658                                                  0,
659                                                  nullptr,
660                                                  consumer->getCookie(),
661                                                  {vbucket_state_active,
662                                                   vbucket_state_replica,
663                                                   vbucket_state_pending},
664                                                  CheckConflicts::No,
665                                                  true,
666                                                  GenerateBySeqno::No,
667                                                  GenerateCas::No,
668                                                  message->getExtMetaData());
669 
670         switchComplete = true;
671         break;
672     case MessageType::Prepare:
673         ret = engine->getKVBucket()->prepare(*message->getItem(),
674                                              consumer->getCookie());
675         // If the the stream has received and successfully processed a pending
676         // SyncWrite, then we have to flag that the Replica must notify the
677         // DurabilityMonitor at snapshot-end received for the DM to move the
678         // HighPreparedSeqno.
679         if (ret == cb::engine_errc::success) {
680             cur_snapshot_prepare.store(true);
681         }
682 
683         switchComplete = true;
684         break;
685     case MessageType::Expiration:
686         deleteSource = DeleteSource::TTL;
687     // fallthrough with deleteSource updated
688     case MessageType::Deletion:
689         uint64_t delCas = 0;
690         ItemMetaData meta = message->getItem()->getMetaData();
691         ret = engine->getKVBucket()->deleteWithMeta(
692                 message->getItem()->getKey(),
693                 delCas,
694                 nullptr,
695                 message->getVBucket(),
696                 consumer->getCookie(),
697                 {vbucket_state_active,
698                  vbucket_state_replica,
699                  vbucket_state_pending},
700                 CheckConflicts::No,
701                 meta,
702                 GenerateBySeqno::No,
703                 GenerateCas::No,
704                 *message->getBySeqno(),
705                 message->getExtMetaData(),
706                 deleteSource);
707         if (ret == cb::engine_errc::no_such_key) {
708             ret = cb::engine_errc::success;
709         }
710         switchComplete = true;
711         break;
712     }
713     if (!switchComplete) {
714         throw std::logic_error(
715                 std::string("PassiveStream::processMessage: "
716                             "Message type not supported"));
717     }
718 
719     if (ret != cb::engine_errc::success) {
720         // ENOMEM logging is handled by maybeLogMemoryState
721         if (ret != cb::engine_errc::no_memory) {
722             log(spdlog::level::level_enum::warn,
723                 "{} Got error '{}' while trying to process "
724                 "{} with seqno:{} cid:{}",
725                 vb_,
726                 cb::to_string(ret),
727                 taskToString[messageType],
728                 message->getItem()->getBySeqno(),
729                 message->getItem()->getKey().getCollectionID());
730         }
731     } else {
732         handleSnapshotEnd(vb, *message->getBySeqno());
733     }
734 
735     maybeLogMemoryState(
736             ret, taskToString[messageType], message->getItem()->getBySeqno());
737 
738     return ret;
739 }
740 
741 cb::engine_errc PassiveStream::processMutation(
742         MutationConsumerMessage* mutation) {
743     return processMessage(mutation, MessageType::Mutation);
744 }
745 
746 cb::engine_errc PassiveStream::processDeletion(
747         MutationConsumerMessage* deletion) {
748     return processMessage(deletion, MessageType::Deletion);
749 }
750 
751 cb::engine_errc PassiveStream::processExpiration(
752         MutationConsumerMessage* expiration) {
753     return processMessage(expiration, MessageType::Expiration);
754 }
755 
756 cb::engine_errc PassiveStream::processPrepare(
757         MutationConsumerMessage* prepare) {
758     auto result = processMessage(prepare, MessageType::Prepare);
759     if (result == cb::engine_errc::success) {
760         Expects(prepare->getItem()->getBySeqno() ==
761                 engine->getVBucket(vb_)->getHighSeqno());
762     }
763     return result;
764 }
765 
766 void PassiveStream::seqnoAck(int64_t seqno) {
767     // Only send a seqnoAck if we have an active stream that the producer has
768     // responded with Success to the stream request
769     if (!isActive() || isPending()) {
770         log(spdlog::level::level_enum::warn,
771             "{} Could not ack seqno {} because stream was in StreamState:{} "
772             "Expected it to be in state {}",
773             vb_,
774             seqno,
775             to_string(state_.load()),
776             to_string(StreamState::Reading));
777         return;
778     }
779 
780     {
781         std::lock_guard<std::mutex> lh(streamMutex);
782         if (!isActive()) {
783             return;
784         }
785 
786         pushToReadyQ(
787                 std::make_unique<SeqnoAcknowledgement>(opaque_, vb_, seqno));
788     }
789     notifyStreamReady();
790 }
791 
792 std::string PassiveStream::to_string(StreamState st) {
793     switch (st) {
794     case StreamState::Pending:
795         return "pending";
796     case StreamState::Reading:
797         return "reading";
798     case StreamState::Dead:
799         return "dead";
800     }
801     throw std::invalid_argument("PassiveStream::to_string(StreamState): " +
802                                 std::to_string(int(st)));
803 }
804 
805 cb::engine_errc PassiveStream::processCommit(
806         const CommitSyncWriteConsumer& commit) {
807     VBucketPtr vb = engine->getVBucket(vb_);
808 
809     if (!vb) {
810         return cb::engine_errc::not_my_vbucket;
811     }
812 
813     // The state of the VBucket should never change during a commit, because
814     // VBucket::commit() may generated expired items.
815     // NOTE: Theoretically this will never occur, because we kill all streams
816     // when changing the VBucket state.
817     folly::SharedMutex::ReadHolder rlh(vb->getStateLock());
818 
819     auto rv = vb->commit(rlh,
820                          commit.getKey(),
821                          commit.getPreparedSeqno(),
822                          *commit.getBySeqno(),
823                          vb->lockCollections(commit.getKey()));
824     if (rv != cb::engine_errc::success) {
825         log(spdlog::level::level_enum::warn,
826             "PassiveStream::processCommit: {} Got error '{}' while trying to "
827             "process commit",
828             vb_,
829             cb::to_string(rv));
830     } else {
831         handleSnapshotEnd(vb, *commit.getBySeqno());
832     }
833 
834     return rv;
835 }
836 
837 cb::engine_errc PassiveStream::processAbort(
838         const AbortSyncWriteConsumer& abort) {
839     VBucketPtr vb = engine->getVBucket(vb_);
840 
841     if (!vb) {
842         return cb::engine_errc::not_my_vbucket;
843     }
844 
845     // The state of the VBucket should never change during an abort, because
846     // VBucket::abort() may generated expired items.
847     // NOTE: Theoretically this will never occur, because we kill all streams
848     // when changing the VBucket state.
849     folly::SharedMutex::ReadHolder rlh(vb->getStateLock());
850 
851     auto rv = vb->abort(rlh,
852                         abort.getKey(),
853                         abort.getPreparedSeqno(),
854                         abort.getAbortSeqno(),
855                         vb->lockCollections(abort.getKey()));
856 
857     if (rv != cb::engine_errc::success) {
858         log(spdlog::level::level_enum::warn,
859             "PassiveStream::processAbort: {} Got error '{}' while trying to "
860             "process abort",
861             vb_,
862             cb::to_string(rv));
863     } else {
864         handleSnapshotEnd(vb, *abort.getBySeqno());
865     }
866 
867     return rv;
868 }
869 
870 cb::engine_errc PassiveStream::processSystemEvent(
871         const SystemEventMessage& event) {
872     VBucketPtr vb = engine->getVBucket(vb_);
873 
874     if (!vb) {
875         return cb::engine_errc::not_my_vbucket;
876     }
877 
878     cb::engine_errc rv = cb::engine_errc::success;
879     // Depending on the event, extras is different and key may even be empty
880     // The specific handler will know how to interpret.
881     switch (event.getSystemEvent()) {
882     case mcbp::systemevent::id::CreateCollection: {
883         rv = processCreateCollection(*vb, CreateCollectionEvent(event));
884         break;
885     }
886     case mcbp::systemevent::id::DeleteCollection: {
887         rv = processDropCollection(*vb, DropCollectionEvent(event));
888         break;
889     }
890     case mcbp::systemevent::id::CreateScope: {
891         rv = processCreateScope(*vb, CreateScopeEvent(event));
892         break;
893     }
894     case mcbp::systemevent::id::DropScope: {
895         rv = processDropScope(*vb, DropScopeEvent(event));
896         break;
897     }
898     default: {
899         rv = cb::engine_errc::invalid_arguments;
900         break;
901     }
902     }
903 
904     if (rv != cb::engine_errc::success) {
905         log(spdlog::level::level_enum::warn,
906             "{} Got error '{}' while trying to process "
907             "system event",
908             vb_,
909             cb::to_string(rv));
910     } else {
911         handleSnapshotEnd(vb, *event.getBySeqno());
912     }
913 
914     return rv;
915 }
916 
917 cb::engine_errc PassiveStream::processCreateCollection(
918         VBucket& vb, const CreateCollectionEvent& event) {
919     try {
920         vb.replicaCreateCollection(
921                 event.getManifestUid(),
922                 {event.getScopeID(), event.getCollectionID()},
923                 event.getKey(),
924                 event.getMaxTtl(),
925                 event.getBySeqno());
926     } catch (std::exception& e) {
927         log(spdlog::level::level_enum::warn,
928             "PassiveStream::processCreateCollection {} exception {}",
929             vb.getId(),
930             e.what());
931         return cb::engine_errc::invalid_arguments;
932     }
933     return cb::engine_errc::success;
934 }
935 
936 cb::engine_errc PassiveStream::processDropCollection(
937         VBucket& vb, const DropCollectionEvent& event) {
938     try {
939         vb.replicaDropCollection(event.getManifestUid(),
940                                  event.getCollectionID(),
941                                  event.getBySeqno());
942     } catch (std::exception& e) {
943         log(spdlog::level::level_enum::warn,
944             "PassiveStream::processDropCollection {} exception {}",
945             vb.getId(),
946             e.what());
947         return cb::engine_errc::invalid_arguments;
948     }
949     return cb::engine_errc::success;
950 }
951 
952 cb::engine_errc PassiveStream::processCreateScope(
953         VBucket& vb, const CreateScopeEvent& event) {
954     try {
955         vb.replicaCreateScope(event.getManifestUid(),
956                               event.getScopeID(),
957                               event.getKey(),
958                               event.getBySeqno());
959     } catch (std::exception& e) {
960         log(spdlog::level::level_enum::warn,
961             "PassiveStream::processCreateScope {} exception {}",
962             vb.getId(),
963             e.what());
964         return cb::engine_errc::invalid_arguments;
965     }
966     return cb::engine_errc::success;
967 }
968 
969 cb::engine_errc PassiveStream::processDropScope(VBucket& vb,
970                                                 const DropScopeEvent& event) {
971     try {
972         vb.replicaDropScope(
973                 event.getManifestUid(), event.getScopeID(), event.getBySeqno());
974     } catch (std::exception& e) {
975         log(spdlog::level::level_enum::warn,
976             "PassiveStream::processDropScope {} exception {}",
977             vb.getId(),
978             e.what());
979         return cb::engine_errc::invalid_arguments;
980     }
981     return cb::engine_errc::success;
982 }
983 
984 void PassiveStream::processMarker(SnapshotMarker* marker) {
985     VBucketPtr vb = engine->getVBucket(vb_);
986 
987     // cur_snapshot_start is initialised to 0 so only set it for numbers > 0,
988     // as the first snapshot maybe have a snap_start_seqno of 0.
989     if (marker->getStartSeqno() > 0) {
990         cur_snapshot_start.store(marker->getStartSeqno());
991     }
992     cur_snapshot_end.store(marker->getEndSeqno());
993     const auto prevSnapType = cur_snapshot_type.load();
994     cur_snapshot_type.store((marker->getFlags() & MARKER_FLAG_DISK)
995                                     ? Snapshot::Disk
996                                     : Snapshot::Memory);
997 
998     if (vb) {
999         auto checkpointType = marker->getFlags() & MARKER_FLAG_DISK
1000                                       ? CheckpointType::Disk
1001                                       : CheckpointType::Memory;
1002 
1003         // Check whether the snapshot can be considered as an initial disk
1004         // checkpoint for the replica.
1005         if (checkpointType == CheckpointType::Disk && vb->getHighSeqno() == 0) {
1006             checkpointType = CheckpointType::InitialDisk;
1007         }
1008 
1009         auto& ckptMgr = *vb->checkpointManager;
1010 
1011         std::optional<uint64_t> hcs = marker->getHighCompletedSeqno();
1012         if ((marker->getFlags() & MARKER_FLAG_DISK) &&
1013             !supportsSyncReplication) {
1014             // If this stream doesn't support SyncReplication (i.e. the producer
1015             // is a pre-MadHatter version) then we should consider the HCS to be
1016             // present but zero for disk snapshot (not possible for any
1017             // SyncWrites to have completed yet). If SyncReplication is
1018             // supported then use the value from the marker.
1019             hcs = 0;
1020         }
1021 
1022         if (marker->getFlags() & MARKER_FLAG_DISK) {
1023             // A replica could receive a duplicate DCP prepare during a disk
1024             // snapshot if it had previously received an uncompleted prepare.
1025             // We can receive a disk snapshot when we either:
1026             //     a) First connect
1027             //     b) Get cursor dropped by the active
1028             //
1029             // We selectively allow these prepares to overwrite the old one by
1030             // setting a duplicate prepare window in the vBucket. This will
1031             // allow any currently outstanding prepares to be overwritten, but
1032             // not any new ones.
1033             vb->setDuplicatePrepareWindow();
1034         }
1035 
1036         // We could be connected to a non sync-repl, so if the max-visible is
1037         // not transmitted (optional is false), set visible to snap-end
1038         auto visibleSeq =
1039                 marker->getMaxVisibleSeqno().value_or(marker->getEndSeqno());
1040 
1041         if (checkpointType == CheckpointType::InitialDisk) {
1042             // Case: receiving the first snapshot in a Disk snapshot.
1043             // Note that replica may never execute here as the active may switch
1044             // directly to in-memory and send the first snapshot in a Memory
1045             // snapshot.
1046 
1047             vb->setReceivingInitialDiskSnapshot(true);
1048             ckptMgr.createSnapshot(cur_snapshot_start.load(),
1049                                    cur_snapshot_end.load(),
1050                                    hcs,
1051                                    checkpointType,
1052                                    visibleSeq);
1053         } else {
1054             // Case: receiving any type of snapshot (Disk/Memory).
1055 
1056             if (marker->getFlags() & MARKER_FLAG_CHK) {
1057                 ckptMgr.createSnapshot(cur_snapshot_start.load(),
1058                                        cur_snapshot_end.load(),
1059                                        hcs,
1060                                        checkpointType,
1061                                        visibleSeq);
1062             } else {
1063                 // MB-42780: In general we cannot merge multiple snapshots into
1064                 // the same checkpoint. The only exception is for when replica
1065                 // receives multiple Memory checkpoints in a row.
1066                 // Since 6.5.0 the Active behaves correctly with regard to that
1067                 // (ie, the Active always sets the MARKER_FLAG_CHK in a snapshot
1068                 // transition tha involves Disk snapshots), but older Producers
1069                 // may still miss the MARKER_FLAG_CHK.
1070                 if (prevSnapType == Snapshot::Memory &&
1071                     cur_snapshot_type == Snapshot::Memory) {
1072                     ckptMgr.extendOpenCheckpoint(cur_snapshot_end.load(),
1073                                                  visibleSeq);
1074                 } else {
1075                     ckptMgr.createSnapshot(cur_snapshot_start.load(),
1076                                            cur_snapshot_end.load(),
1077                                            hcs,
1078                                            checkpointType,
1079                                            visibleSeq);
1080                 }
1081             }
1082         }
1083 
1084         if (marker->getFlags() & MARKER_FLAG_ACK) {
1085             cur_snapshot_ack = true;
1086         }
1087     }
1088 }
1089 
1090 void PassiveStream::processSetVBucketState(SetVBucketState* state) {
1091     engine->getKVBucket()->setVBucketState(
1092             vb_, state->getState(), {}, TransferVB::Yes);
1093     {
1094         std::lock_guard<std::mutex> lh(streamMutex);
1095         pushToReadyQ(std::make_unique<SetVBucketStateResponse>(
1096                 opaque_, cb::mcbp::Status::Success));
1097     }
1098     notifyStreamReady();
1099 }
1100 
1101 void PassiveStream::handleSnapshotEnd(VBucketPtr& vb, uint64_t byseqno) {
1102     if (byseqno == cur_snapshot_end.load()) {
1103         if (cur_snapshot_type.load() == Snapshot::Disk) {
1104             vb->setReceivingInitialDiskSnapshot(false);
1105         }
1106 
1107         if (cur_snapshot_ack) {
1108             {
1109                 std::lock_guard<std::mutex> lh(streamMutex);
1110                 pushToReadyQ(std::make_unique<SnapshotMarkerResponse>(
1111                         opaque_, cb::mcbp::Status::Success));
1112             }
1113             notifyStreamReady();
1114             cur_snapshot_ack = false;
1115         }
1116 
1117         // Notify the PassiveDM that the snapshot-end mutation has been
1118         // received on PassiveStream, if the snapshot contains at least one
1119         // Prepare. That is necessary for unblocking the High Prepared Seqno
1120         // in PassiveDM. Note that the HPS is what the PassiveDM acks back to
1121         // the Active. See comments in PassiveDM for details.
1122 
1123         // Disk snapshots are subject to deduplication, and may be missing
1124         // purged aborts. We must notify the PDM even if we have not seen a
1125         // prepare, to account for possible unseen prepares.
1126         if (cur_snapshot_prepare ||
1127             cur_snapshot_type.load() == Snapshot::Disk) {
1128             vb->notifyPassiveDMOfSnapEndReceived(byseqno);
1129             cur_snapshot_prepare.store(false);
1130         }
1131     }
1132 }
1133 
1134 void PassiveStream::addStats(const AddStatFn& add_stat, CookieIface& c) {
1135     Stream::addStats(add_stat, c);
1136 
1137     try {
1138         std::array<char, 1024> buf;
1139         size_t bufferItems = 0;
1140         size_t bufferBytes = 0;
1141         {
1142             std::lock_guard<std::mutex> lg(buffer.bufMutex);
1143             bufferItems = buffer.messages.size();
1144             bufferBytes = buffer.bytes;
1145         }
1146         checked_snprintf(buf.data(),
1147                          buf.size(),
1148                          "%s:stream_%d_buffer_items",
1149                          name_.c_str(),
1150                          vb_.get());
1151         add_casted_stat(buf.data(), bufferItems, add_stat, c);
1152         checked_snprintf(buf.data(),
1153                          buf.size(),
1154                          "%s:stream_%d_buffer_bytes",
1155                          name_.c_str(),
1156                          vb_.get());
1157         add_casted_stat(buf.data(), bufferBytes, add_stat, c);
1158         checked_snprintf(buf.data(),
1159                          buf.size(),
1160                          "%s:stream_%d_last_received_seqno",
1161                          name_.c_str(),
1162                          vb_.get());
1163         add_casted_stat(buf.data(), last_seqno.load(), add_stat, c);
1164         checked_snprintf(buf.data(),
1165                          buf.size(),
1166                          "%s:stream_%d_ready_queue_memory",
1167                          name_.c_str(),
1168                          vb_.get());
1169         add_casted_stat(buf.data(), getReadyQueueMemory(), add_stat, c);
1170 
1171         checked_snprintf(buf.data(),
1172                          buf.size(),
1173                          "%s:stream_%d_cur_snapshot_type",
1174                          name_.c_str(),
1175                          vb_.get());
1176         add_casted_stat(
1177                 buf.data(), ::to_string(cur_snapshot_type.load()), add_stat, c);
1178 
1179         if (cur_snapshot_type.load() != Snapshot::None) {
1180             checked_snprintf(buf.data(),
1181                              buf.size(),
1182                              "%s:stream_%d_cur_snapshot_start",
1183                              name_.c_str(),
1184                              vb_.get());
1185             add_casted_stat(buf.data(), cur_snapshot_start.load(), add_stat, c);
1186             checked_snprintf(buf.data(),
1187                              buf.size(),
1188                              "%s:stream_%d_cur_snapshot_end",
1189                              name_.c_str(),
1190                              vb_.get());
1191             add_casted_stat(buf.data(), cur_snapshot_end.load(), add_stat, c);
1192         }
1193 
1194         checked_snprintf(buf.data(),
1195                          buf.size(),
1196                          "%s:stream_%d_cur_snapshot_prepare",
1197                          name_.c_str(),
1198                          vb_.get());
1199         add_casted_stat(buf.data(), cur_snapshot_prepare.load(), add_stat, c);
1200 
1201         auto stream_req_value = createStreamReqValue();
1202 
1203         if (!stream_req_value.empty()) {
1204             checked_snprintf(buf.data(),
1205                              buf.size(),
1206                              "%s:stream_%d_vb_manifest_uid",
1207                              name_.c_str(),
1208                              vb_.get());
1209             add_casted_stat(buf.data(), stream_req_value.c_str(), add_stat, c);
1210         }
1211 
1212     } catch (std::exception& error) {
1213         EP_LOG_WARN("PassiveStream::addStats: Failed to build stats: {}",
1214                     error.what());
1215     }
1216 }
1217 
1218 std::unique_ptr<DcpResponse> PassiveStream::next() {
1219     std::lock_guard<std::mutex> lh(streamMutex);
1220 
1221     if (readyQ.empty()) {
1222         itemsReady.store(false);
1223         return nullptr;
1224     }
1225 
1226     return popFromReadyQ();
1227 }
1228 
1229 uint32_t PassiveStream::clearBuffer_UNLOCKED() {
1230     uint32_t unackedBytes = buffer.bytes;
1231     buffer.messages.clear();
1232     buffer.bytes = 0;
1233     return unackedBytes;
1234 }
1235 
1236 bool PassiveStream::transitionState(StreamState newState) {
1237     log(spdlog::level::level_enum::debug,
1238         "PassiveStream::transitionState: ({}) "
1239         "Transitioning from {} to {}",
1240         vb_,
1241         to_string(state_.load()),
1242         to_string(newState));
1243 
1244     if (state_ == newState) {
1245         return false;
1246     }
1247 
1248     bool validTransition = false;
1249     switch (state_.load()) {
1250     case StreamState::Pending:
1251         if (newState == StreamState::Reading || newState == StreamState::Dead) {
1252             validTransition = true;
1253         }
1254         break;
1255     case StreamState::Reading:
1256         if (newState == StreamState::Dead) {
1257             validTransition = true;
1258         }
1259         break;
1260 
1261     case StreamState::Dead:
1262         // Once 'dead' shouldn't transition away from it.
1263         break;
1264     }
1265 
1266     if (!validTransition) {
1267         throw std::invalid_argument(
1268                 "PassiveStream::transitionState:"
1269                 " newState (which is" +
1270                 to_string(newState) +
1271                 ") is not valid for current state (which is " +
1272                 to_string(state_.load()) + ")");
1273     }
1274 
1275     state_ = newState;
1276     return true;
1277 }
1278 
1279 void PassiveStream::notifyStreamReady() {
1280     auto consumer = consumerPtr.lock();
1281     if (!consumer) {
1282         return;
1283     }
1284 
1285     bool inverse = false;
1286     if (itemsReady.compare_exchange_strong(inverse, true)) {
1287         consumer->notifyStreamReady(vb_);
1288     }
1289 }
1290 
1291 const std::string PassiveStream::createStreamReqValue() const {
1292     nlohmann::json stream_req_json;
1293     std::ostringstream ostr;
1294     ostr << std::hex << static_cast<uint64_t>(vb_manifest_uid);
1295     stream_req_json["uid"] = ostr.str();
1296     return stream_req_json.dump();
1297 }
1298 
1299 template <typename... Args>
1300 void PassiveStream::log(spdlog::level::level_enum severity,
1301                         const char* fmt,
1302                         Args... args) const {
1303     auto consumer = consumerPtr.lock();
1304     if (consumer) {
1305         consumer->getLogger().log(severity, fmt, args...);
1306     } else {
1307         if (getGlobalBucketLogger()->should_log(severity)) {
1308             getGlobalBucketLogger()->log(
1309                     severity,
1310                     std::string{passiveStreamLoggingPrefix}.append(fmt).data(),
1311                     args...);
1312         }
1313     }
1314 }
1315 
1316 void PassiveStream::maybeLogMemoryState(cb::engine_errc status,
1317                                         const std::string& msgType,
1318                                         int64_t seqno) {
1319     bool previousNoMem = isNoMemory.load();
1320     if (status == cb::engine_errc::no_memory && !previousNoMem) {
1321         log(spdlog::level::level_enum::warn,
1322             "{} Got error '{}' while trying to process "
1323             "{} with seqno:{}",
1324             vb_,
1325             cb::to_string(status),
1326             msgType,
1327             seqno);
1328         isNoMemory.store(true);
1329     } else if (status == cb::engine_errc::success && previousNoMem) {
1330         log(spdlog::level::level_enum::info,
1331             "{} PassiveStream resuming after no-memory backoff",
1332             vb_);
1333         isNoMemory.store(false);
1334     }
1335 }
1336 
1337 bool PassiveStream::Buffer::empty() const {
1338     std::lock_guard<std::mutex> lh(bufMutex);
1339     return messages.empty();
1340 }
1341 
1342 void PassiveStream::Buffer::push(PassiveStream::Buffer::BufferType bufferItem) {
1343     std::lock_guard<std::mutex> lg(bufMutex);
1344     bytes += bufferItem.second;
1345     messages.emplace_back(std::move(bufferItem));
1346 }
1347 
1348 void PassiveStream::Buffer::pop_front(const std::unique_lock<std::mutex>& lh) {
1349     if (messages.empty()) {
1350         return;
1351     }
1352     if (messages.front().first) {
1353         bytes -= messages.front().second;
1354     }
1355 
1356     messages.pop_front();
1357 }
1358 
1359 PassiveStream::Buffer::BufferType PassiveStream::Buffer::moveFromFront(
1360         const std::unique_lock<std::mutex>& lh) {
1361     bytes -= messages.front().second;
1362     return {std::move(messages.front().first), messages.front().second};
1363 }
1364 
1365 void PassiveStream::Buffer::moveToFront(
1366         const std::unique_lock<std::mutex>& lh,
1367         PassiveStream::Buffer::BufferType bufferItem) {
1368     bytes += bufferItem.second;
1369     messages.front().first = std::move(bufferItem.first);
1370 }
1371 
1372 std::string PassiveStream::Labeller::getLabel(const char* name) const {
1373     return fmt::format("PassiveStream({} {})::{}",
1374                        stream.getVBucket(),
1375                        stream.getName(),
1376                        name);
1377 }