1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2015 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 #include "dcp/response.h"
18 #include "dcp/stream.h"
19 #include "statwriter.h"
20 #include "vbucket.h"
21 
22 #include <platform/checked_snprintf.h>
23 
24 #include <engines/ep/src/bucket_logger.h>
25 #include <memory>
26 
to_string(Stream::Snapshot type)27 const char* to_string(Stream::Snapshot type) {
28     switch (type) {
29     case Stream::Snapshot::None:
30         return "none";
31     case Stream::Snapshot::Disk:
32         return "disk";
33     case Stream::Snapshot::Memory:
34         return "memory";
35     }
36     throw std::logic_error("to_string(Stream::Snapshot): called with invalid "
37             "Snapshot type:" + std::to_string(int(type)));
38 }
39 
40 const uint64_t Stream::dcpMaxSeqno = std::numeric_limits<uint64_t>::max();
41 
Stream(const std::string& name, uint32_t flags, uint32_t opaque, Vbid vb, uint64_t start_seqno, uint64_t end_seqno, uint64_t vb_uuid, uint64_t snap_start_seqno, uint64_t snap_end_seqno)42 Stream::Stream(const std::string& name,
43                uint32_t flags,
44                uint32_t opaque,
45                Vbid vb,
46                uint64_t start_seqno,
47                uint64_t end_seqno,
48                uint64_t vb_uuid,
49                uint64_t snap_start_seqno,
50                uint64_t snap_end_seqno)
51     : name_(name),
52       flags_(flags),
53       opaque_(opaque),
54       vb_(vb),
55       start_seqno_(start_seqno),
56       end_seqno_(end_seqno),
57       vb_uuid_(vb_uuid),
58       snap_start_seqno_(snap_start_seqno),
59       snap_end_seqno_(snap_end_seqno),
60       itemsReady(false),
61       readyQ_non_meta_items(0),
62       readyQueueMemory(0) {
63 }
64 
~Stream()65 Stream::~Stream() {
66     // NB: reusing the "unlocked" method without a lock because we're
67     // destructing and should not take any locks.
68     clear_UNLOCKED();
69 }
70 
clear_UNLOCKED()71 void Stream::clear_UNLOCKED() {
72     while (!readyQ.empty()) {
73         popFromReadyQ();
74     }
75 }
76 
pushToReadyQ(std::unique_ptr<DcpResponse> resp)77 void Stream::pushToReadyQ(std::unique_ptr<DcpResponse> resp) {
78     /* expect streamMutex.ownsLock() == true */
79     if (resp) {
80         if (!resp->isMetaEvent()) {
81             readyQ_non_meta_items++;
82         }
83         readyQueueMemory.fetch_add(resp->getMessageSize(),
84                                    std::memory_order_relaxed);
85         readyQ.push(std::move(resp));
86     }
87 }
88 
popFromReadyQ(void)89 std::unique_ptr<DcpResponse> Stream::popFromReadyQ(void) {
90     /* expect streamMutex.ownsLock() == true */
91     if (!readyQ.empty()) {
92         auto front = std::move(readyQ.front());
93         readyQ.pop();
94 
95         if (!front->isMetaEvent()) {
96             readyQ_non_meta_items--;
97         }
98         const uint32_t respSize = front->getMessageSize();
99 
100         /* Decrement the readyQ size */
101         if (respSize <= readyQueueMemory.load(std::memory_order_relaxed)) {
102             readyQueueMemory.fetch_sub(respSize, std::memory_order_relaxed);
103         } else {
104             EP_LOG_DEBUG(
105                     "readyQ size for stream {} ({}) underflow, likely wrong "
106                     "stat calculation! curr size: {}; new size: {}",
107                     name_.c_str(),
108                     getVBucket(),
109                     readyQueueMemory.load(std::memory_order_relaxed),
110                     respSize);
111             readyQueueMemory.store(0, std::memory_order_relaxed);
112         }
113 
114         return front;
115     }
116 
117     return nullptr;
118 }
119 
getReadyQueueMemory()120 uint64_t Stream::getReadyQueueMemory() {
121     return readyQueueMemory.load(std::memory_order_relaxed);
122 }
123 
addStats(const AddStatFn& add_stat, const void* c)124 void Stream::addStats(const AddStatFn& add_stat, const void* c) {
125     try {
126         const int bsize = 1024;
127         char buffer[bsize];
128         checked_snprintf(
129                 buffer, bsize, "%s:stream_%d_flags", name_.c_str(), vb_.get());
130         add_casted_stat(buffer, flags_, add_stat, c);
131         checked_snprintf(
132                 buffer, bsize, "%s:stream_%d_opaque", name_.c_str(), vb_.get());
133         add_casted_stat(buffer, opaque_, add_stat, c);
134         checked_snprintf(buffer,
135                          bsize,
136                          "%s:stream_%d_start_seqno",
137                          name_.c_str(),
138                          vb_.get());
139         add_casted_stat(buffer, start_seqno_, add_stat, c);
140         checked_snprintf(buffer,
141                          bsize,
142                          "%s:stream_%d_end_seqno",
143                          name_.c_str(),
144                          vb_.get());
145         add_casted_stat(buffer, end_seqno_, add_stat, c);
146         checked_snprintf(buffer,
147                          bsize,
148                          "%s:stream_%d_vb_uuid",
149                          name_.c_str(),
150                          vb_.get());
151         add_casted_stat(buffer, vb_uuid_, add_stat, c);
152         checked_snprintf(buffer,
153                          bsize,
154                          "%s:stream_%d_snap_start_seqno",
155                          name_.c_str(),
156                          vb_.get());
157         add_casted_stat(buffer, snap_start_seqno_, add_stat, c);
158         checked_snprintf(buffer,
159                          bsize,
160                          "%s:stream_%d_snap_end_seqno",
161                          name_.c_str(),
162                          vb_.get());
163         add_casted_stat(buffer, snap_end_seqno_, add_stat, c);
164 
165         checked_snprintf(
166                 buffer, bsize, "%s:stream_%d_state", name_.c_str(), vb_.get());
167         add_casted_stat(buffer, getStateName(), add_stat, c);
168 
169         checked_snprintf(buffer,
170                          bsize,
171                          "%s:stream_%d_items_ready",
172                          name_.c_str(),
173                          vb_.get());
174         add_casted_stat(buffer, itemsReady.load(), add_stat, c);
175 
176         size_t readyQsize;
177         {
178             std::lock_guard<std::mutex> lh(streamMutex);
179             readyQsize = readyQ.size();
180         }
181         checked_snprintf(buffer,
182                          bsize,
183                          "%s:stream_%d_readyQ_items",
184                          name_.c_str(),
185                          vb_.get());
186         add_casted_stat(buffer, readyQsize, add_stat, c);
187     } catch (std::exception& error) {
188         EP_LOG_WARN("Stream::addStats: Failed to build stats: {}",
189                     error.what());
190     }
191 }
192 
clear()193 void Stream::clear() {
194     LockHolder lh(streamMutex);
195     clear_UNLOCKED();
196 }
197