1 /* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2 /*
3  *     Copyright 2013 Couchbase, Inc
4  *
5  *   Licensed under the Apache License, Version 2.0 (the "License");
6  *   you may not use this file except in compliance with the License.
7  *   You may obtain a copy of the License at
8  *
9  *       http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *   Unless required by applicable law or agreed to in writing, software
12  *   distributed under the License is distributed on an "AS IS" BASIS,
13  *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *   See the License for the specific language governing permissions and
15  *   limitations under the License.
16  */
17 
18 #ifndef SRC_DCP_RESPONSE_H_
19 #define SRC_DCP_RESPONSE_H_ 1
20 
21 #include "config.h"
22 
23 #include "ext_meta_parser.h"
24 #include "item.h"
25 
26 typedef enum {
27     DCP_MUTATION,
28     DCP_DELETION,
29     DCP_EXPIRATION,
30     DCP_FLUSH,
31     DCP_SET_VBUCKET,
32     DCP_STREAM_REQ,
33     DCP_STREAM_END,
34     DCP_SNAPSHOT_MARKER,
35     DCP_ADD_STREAM
36 } dcp_event_t;
37 
38 
39 typedef enum {
40     MARKER_FLAG_MEMORY = 0x01,
41     MARKER_FLAG_DISK   = 0x02,
42     MARKER_FLAG_CHK    = 0x04,
43     MARKER_FLAG_ACK    = 0x08
44 } dcp_marker_flag_t;
45 
46 class DcpResponse {
47 public:
DcpResponse(dcp_event_t event, uint32_t opaque)48     DcpResponse(dcp_event_t event, uint32_t opaque)
49         : opaque_(opaque), event_(event) {}
50 
~DcpResponse()51     virtual ~DcpResponse() {}
52 
getOpaque()53     uint32_t getOpaque() {
54         return opaque_;
55     }
56 
getEvent()57     dcp_event_t getEvent() {
58         return event_;
59     }
60 
61     virtual uint32_t getMessageSize() = 0;
62 
63 private:
64     uint32_t opaque_;
65     dcp_event_t event_;
66 };
67 
68 class StreamRequest : public DcpResponse {
69 public:
StreamRequest(uint16_t vbucket, uint32_t opaque, uint32_t flags, uint64_t startSeqno, uint64_t endSeqno, uint64_t vbucketUUID, uint64_t snapStartSeqno, uint64_t snapEndSeqno)70     StreamRequest(uint16_t vbucket, uint32_t opaque, uint32_t flags,
71                   uint64_t startSeqno, uint64_t endSeqno, uint64_t vbucketUUID,
72                   uint64_t snapStartSeqno, uint64_t snapEndSeqno)
73         : DcpResponse(DCP_STREAM_REQ, opaque), startSeqno_(startSeqno),
74           endSeqno_(endSeqno), vbucketUUID_(vbucketUUID),
75           snapStartSeqno_(snapStartSeqno), snapEndSeqno_(snapEndSeqno),
76           flags_(flags), vbucket_(vbucket) {}
77 
~StreamRequest()78     ~StreamRequest() {}
79 
getVBucket()80     uint16_t getVBucket() {
81         return vbucket_;
82     }
83 
getFlags()84     uint32_t getFlags() {
85         return flags_;
86     }
87 
getStartSeqno()88     uint64_t getStartSeqno() {
89         return startSeqno_;
90     }
91 
getEndSeqno()92     uint64_t getEndSeqno() {
93         return endSeqno_;
94     }
95 
getVBucketUUID()96     uint64_t getVBucketUUID() {
97         return vbucketUUID_;
98     }
99 
getSnapStartSeqno()100     uint64_t getSnapStartSeqno() {
101         return snapStartSeqno_;
102     }
103 
getSnapEndSeqno()104     uint64_t getSnapEndSeqno() {
105         return snapEndSeqno_;
106     }
107 
getMessageSize()108     uint32_t getMessageSize() {
109         return baseMsgBytes;
110     }
111 
112     static const uint32_t baseMsgBytes;
113 
114 private:
115     uint64_t startSeqno_;
116     uint64_t endSeqno_;
117     uint64_t vbucketUUID_;
118     uint64_t snapStartSeqno_;
119     uint64_t snapEndSeqno_;
120     uint32_t flags_;
121     uint16_t vbucket_;
122 };
123 
124 class AddStreamResponse : public DcpResponse {
125 public:
AddStreamResponse(uint32_t opaque, uint32_t streamOpaque, uint16_t status)126     AddStreamResponse(uint32_t opaque, uint32_t streamOpaque, uint16_t status)
127         : DcpResponse(DCP_ADD_STREAM, opaque), streamOpaque_(streamOpaque),
128           status_(status) {}
129 
~AddStreamResponse()130     ~AddStreamResponse() {}
131 
getStreamOpaque()132     uint32_t getStreamOpaque() {
133         return streamOpaque_;
134     }
135 
getStatus()136     uint16_t getStatus() {
137         return status_;
138     }
139 
getMessageSize()140     uint32_t getMessageSize() {
141         return baseMsgBytes;
142     }
143 
144     static const uint32_t baseMsgBytes;
145 
146 private:
147     uint32_t streamOpaque_;
148     uint16_t status_;
149 };
150 
151 class SnapshotMarkerResponse : public DcpResponse {
152 public:
SnapshotMarkerResponse(uint32_t opaque, uint16_t status)153     SnapshotMarkerResponse(uint32_t opaque, uint16_t status)
154         : DcpResponse(DCP_SNAPSHOT_MARKER, opaque), status_(status) {}
155 
getStatus()156     uint16_t getStatus() {
157         return status_;
158     }
159 
getMessageSize()160     uint32_t getMessageSize() {
161         return baseMsgBytes;
162     }
163 
164     static const uint32_t baseMsgBytes;
165 
166 private:
167     uint32_t status_;
168 };
169 
170 class SetVBucketStateResponse : public DcpResponse {
171 public:
SetVBucketStateResponse(uint32_t opaque, uint16_t status)172     SetVBucketStateResponse(uint32_t opaque, uint16_t status)
173         : DcpResponse(DCP_SET_VBUCKET, opaque), status_(status) {}
174 
getStatus()175     uint16_t getStatus() {
176         return status_;
177     }
178 
getMessageSize()179     uint32_t getMessageSize() {
180         return baseMsgBytes;
181     }
182 
183     static const uint32_t baseMsgBytes;
184 
185 private:
186     uint32_t status_;
187 };
188 
189 class StreamEndResponse : public DcpResponse {
190 public:
StreamEndResponse(uint32_t opaque, uint32_t flags, uint16_t vbucket)191     StreamEndResponse(uint32_t opaque, uint32_t flags, uint16_t vbucket)
192         : DcpResponse(DCP_STREAM_END, opaque), flags_(flags),
193           vbucket_(vbucket) {}
194 
getFlags()195     uint16_t getFlags() {
196         return flags_;
197     }
198 
getVbucket()199     uint32_t getVbucket() {
200         return vbucket_;
201     }
202 
getMessageSize()203     uint32_t getMessageSize() {
204         return baseMsgBytes;
205     }
206 
207     static const uint32_t baseMsgBytes;
208 
209 private:
210     uint32_t flags_;
211     uint16_t vbucket_;
212 };
213 
214 class SetVBucketState : public DcpResponse {
215 public:
SetVBucketState(uint32_t opaque, uint16_t vbucket, vbucket_state_t state)216     SetVBucketState(uint32_t opaque, uint16_t vbucket, vbucket_state_t state)
217         : DcpResponse(DCP_SET_VBUCKET, opaque), vbucket_(vbucket),
218           state_(state) {}
219 
getVBucket()220     uint16_t getVBucket() {
221         return vbucket_;
222     }
223 
getState()224     vbucket_state_t getState() {
225         return state_;
226     }
227 
getMessageSize()228     uint32_t getMessageSize() {
229         return baseMsgBytes;
230     }
231 
232     static const uint32_t baseMsgBytes;
233 
234 private:
235     uint16_t vbucket_;
236     vbucket_state_t state_;
237 };
238 
239 class SnapshotMarker : public DcpResponse {
240 public:
SnapshotMarker(uint32_t opaque, uint16_t vbucket, uint64_t start_seqno, uint64_t end_seqno, uint32_t flags)241     SnapshotMarker(uint32_t opaque, uint16_t vbucket, uint64_t start_seqno,
242                    uint64_t end_seqno, uint32_t flags)
243         : DcpResponse(DCP_SNAPSHOT_MARKER, opaque), vbucket_(vbucket),
244           start_seqno_(start_seqno), end_seqno_(end_seqno), flags_(flags) {}
245 
getVBucket()246     uint32_t getVBucket() {
247         return vbucket_;
248     }
249 
getStartSeqno()250     uint64_t getStartSeqno() {
251         return start_seqno_;
252     }
253 
getEndSeqno()254     uint64_t getEndSeqno() {
255         return end_seqno_;
256     }
257 
getFlags()258     uint32_t getFlags() {
259         return flags_;
260     }
261 
getMessageSize()262     uint32_t getMessageSize() {
263         return baseMsgBytes;
264     }
265 
266     static const uint32_t baseMsgBytes;
267 
268 private:
269     uint16_t vbucket_;
270     uint64_t start_seqno_;
271     uint64_t end_seqno_;
272     uint32_t flags_;
273 };
274 
275 class MutationResponse : public DcpResponse {
276 public:
MutationResponse(queued_item item, uint32_t opaque, ExtendedMetaData *e = NULL)277     MutationResponse(queued_item item, uint32_t opaque,
278                      ExtendedMetaData *e = NULL)
279         : DcpResponse(item->isDeleted() ? DCP_DELETION : DCP_MUTATION, opaque),
280           item_(item), emd(e) {}
281 
~MutationResponse()282     ~MutationResponse() {
283         if (emd) {
284             delete emd;
285         }
286     }
287 
getItem()288     queued_item& getItem() {
289         return item_;
290     }
291 
getItemCopy()292     Item* getItemCopy() {
293         return new Item(*item_);
294     }
295 
getVBucket()296     uint16_t getVBucket() {
297         return item_->getVBucketId();
298     }
299 
getBySeqno()300     uint64_t getBySeqno() {
301         return item_->getBySeqno();
302     }
303 
getRevSeqno()304     uint64_t getRevSeqno() {
305         return item_->getRevSeqno();
306     }
307 
getMessageSize()308     uint32_t getMessageSize() {
309         uint32_t base = item_->isDeleted() ? deletionBaseMsgBytes :
310                                              mutationBaseMsgBytes;
311         uint32_t body = item_->getNKey() + item_->getNBytes();
312         if (emd) {
313             body += emd->getExtMeta().second;
314         }
315         return base + body;
316     }
317 
getExtMetaData()318     ExtendedMetaData* getExtMetaData() {
319         return emd;
320     }
321 
322     static const uint32_t mutationBaseMsgBytes;
323     static const uint32_t deletionBaseMsgBytes;
324 
325 private:
326     queued_item item_;
327     ExtendedMetaData *emd;
328 };
329 
330 #endif  // SRC_DCP_RESPONSE_H_
331