1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13% This module is for parsing and encoding all the DCP commands that are needed
14% by the indexer.
15-module(couch_dcp_consumer).
16
17-export([parse_header/1, parse_snapshot_marker/1, parse_snapshot_mutation/4,
18    parse_snapshot_deletion/2, parse_failover_log/1, parse_stat/4]).
19-export([encode_sasl_auth/3, encode_open_connection/3, encode_stream_request/8,
20    encode_failover_log_request/2, encode_stat_request/3, encode_stream_close/2,
21    encode_select_bucket/2]).
22-export([encode_noop_response/1, encode_buffer_request/2,
23    encode_control_request/3, parse_all_seqs/3, encode_all_seqs_request/1]).
24
25-include_lib("couch_dcp/include/couch_dcp.hrl").
26-include_lib("couch_dcp/include/couch_dcp_typespecs.hrl").
27
28
29% TODO vmx 2013-08-22: Bad match error handling
30-spec parse_header(<<_:192>>) ->
31                          {atom(), size()} |
32                          {atom(), dcp_status(), request_id(), size()} |
33                          {atom(), dcp_status(), request_id()} |
34                          {atom(), dcp_status(), request_id(), size(),
35                           size()} |
36                          {atom(), partition_id(), request_id(), size()} |
37                          {atom(), partition_id(), request_id(), size(),
38                           size()} |
39                          {atom(), partition_id(), request_id(), size(),
40                           size(), uint64()} |
41                          {atom(), partition_id(), request_id(), size(),
42                           size(), size(), uint64()}.
43parse_header(<<?DCP_MAGIC_RESPONSE,
44               Opcode,
45               KeyLength:?DCP_SIZES_KEY_LENGTH,
46               _ExtraLength,
47               0,
48               Status:?DCP_SIZES_STATUS,
49               BodyLength:?DCP_SIZES_BODY,
50               RequestId:?DCP_SIZES_OPAQUE,
51               _Cas:?DCP_SIZES_CAS>>) ->
52    case Opcode of
53    ?DCP_OPCODE_STREAM_REQUEST ->
54        {stream_request, Status, RequestId, BodyLength};
55    ?DCP_OPCODE_OPEN_CONNECTION ->
56        {open_connection, RequestId};
57    ?DCP_OPCODE_FAILOVER_LOG_REQUEST ->
58        {failover_log, Status, RequestId, BodyLength};
59    ?DCP_OPCODE_STATS ->
60        {stats, Status, RequestId, BodyLength, KeyLength};
61    ?DCP_OPCODE_SASL_AUTH ->
62        {sasl_auth, Status, RequestId, BodyLength};
63    ?DCP_OPCODE_STREAM_CLOSE ->
64        {stream_close, Status, RequestId, BodyLength};
65    ?DCP_OPCODE_SELECT_BUCKET ->
66        {select_bucket, Status, RequestId, BodyLength};
67    ?DCP_OPCODE_DCP_BUFFER ->
68        {buffer_ack, Status, RequestId};
69    ?DCP_OPCODE_DCP_CONTROL ->
70        {control_request, Status, RequestId};
71    ?DCP_OPCODE_SEQS ->
72        {all_seqs, Status, RequestId, BodyLength}
73    end;
74parse_header(<<?DCP_MAGIC_REQUEST,
75               Opcode,
76               KeyLength:?DCP_SIZES_KEY_LENGTH,
77               ExtraLength,
78               DataType,
79               PartId:?DCP_SIZES_PARTITION,
80               BodyLength:?DCP_SIZES_BODY,
81               RequestId:?DCP_SIZES_OPAQUE,
82               Cas:?DCP_SIZES_CAS>>) ->
83    case Opcode of
84    ?DCP_OPCODE_STREAM_END ->
85        {stream_end, PartId, RequestId, BodyLength};
86    ?DCP_OPCODE_SNAPSHOT_MARKER ->
87        {snapshot_marker, PartId, RequestId, BodyLength};
88    ?DCP_OPCODE_MUTATION ->
89        {snapshot_mutation, PartId, RequestId, KeyLength, BodyLength,
90            ExtraLength, Cas, DataType};
91    ?DCP_OPCODE_DELETION ->
92        {snapshot_deletion, PartId, RequestId, KeyLength, BodyLength, Cas,
93            DataType};
94    ?DCP_OPCODE_EXPIRATION ->
95        {snapshot_expiration, PartId, RequestId, KeyLength, BodyLength, Cas,
96            DataType};
97    ?DCP_OPCODE_DCP_NOOP ->
98        {noop_request, RequestId}
99    end.
100
101-spec parse_snapshot_marker(<<_:160>>) ->
102                                   {snapshot_marker, update_seq(),
103                                    update_seq(), non_neg_integer()}.
104parse_snapshot_marker(Body) ->
105    <<StartSeq:?DCP_SIZES_BY_SEQ,
106      EndSeq:?DCP_SIZES_BY_SEQ,
107      Type:?DCP_SIZES_SNAPSHOT_TYPE>> = Body,
108    {snapshot_marker, StartSeq, EndSeq, Type}.
109
110
111-spec parse_snapshot_mutation(size(), binary(), size(), size()) ->
112                                     {snapshot_mutation, #mutation{}}.
113parse_snapshot_mutation(KeyLength, Body, BodyLength, ExtraLength) ->
114    <<Seq:?DCP_SIZES_BY_SEQ,
115      RevSeq:?DCP_SIZES_REV_SEQ,
116      Flags:?DCP_SIZES_FLAGS,
117      Expiration:?DCP_SIZES_EXPIRATION,
118      LockTime:?DCP_SIZES_LOCK,
119      MetadataLength:?DCP_SIZES_METADATA_LENGTH,
120      _Nru:?DCP_SIZES_NRU_LENGTH,
121      Key:KeyLength/binary,
122      Rest/binary>> = Body,
123    ValueLength = BodyLength - ExtraLength - KeyLength - MetadataLength,
124    <<Value:ValueLength/binary,
125      Metadata:MetadataLength/binary>> = Rest,
126    {snapshot_mutation, #mutation{
127        seq = Seq,
128        rev_seq = RevSeq,
129        flags = Flags,
130        expiration = Expiration,
131        locktime = LockTime,
132        key = Key,
133        value = Value,
134        metadata = Metadata
135    }}.
136
137-spec parse_snapshot_deletion(size(), binary()) ->
138                                     {snapshot_deletion,
139                                      {update_seq(), non_neg_integer(),
140                                       binary(), binary()}}.
141parse_snapshot_deletion(KeyLength, Body) ->
142    % XXX vmx 2014-01-07: No metadata support for now. Make it so it breaks
143    % once it's there.
144    MetadataLength = 0,
145    <<Seq:?DCP_SIZES_BY_SEQ,
146      RevSeq:?DCP_SIZES_REV_SEQ,
147      MetadataLength:?DCP_SIZES_METADATA_LENGTH,
148      Key:KeyLength/binary,
149      Metadata:MetadataLength/binary,
150      XATTRs/binary>> = Body,
151    {snapshot_deletion, {Seq, RevSeq, Key, Metadata, XATTRs}}.
152
153
154-spec parse_failover_log(binary(), partition_version()) ->
155                                {ok, partition_version()}.
156parse_failover_log(Body) ->
157    parse_failover_log(Body, []).
158parse_failover_log(<<>>, Acc) ->
159    {ok, lists:reverse(Acc)};
160parse_failover_log(<<PartUuid:?DCP_SIZES_PARTITION_UUID,
161                     PartSeq:?DCP_SIZES_BY_SEQ,
162                     Rest/binary>>,
163                   Acc) ->
164    parse_failover_log(Rest, [{PartUuid, PartSeq}|Acc]).
165
166
167-spec parse_stat(binary(), dcp_status(), size(), size()) ->
168                        {ok, {binary(), binary()}} |
169                        {error, {dcp_status(), binary()}}.
170parse_stat(Body, Status, 0, _ValueLength) ->
171    {error, {Status, Body}};
172parse_stat(Body, ?DCP_STATUS_OK, KeyLength, ValueLength) ->
173    <<Key:KeyLength/binary, Value:ValueLength/binary>> = Body,
174    {ok, {Key, Value}}.
175
176-spec parse_all_seqs(dcp_status(), binary(), list()) ->
177                        {ok, list()} |
178                        {error, {dcp_status(), binary()}}.
179parse_all_seqs(?DCP_STATUS_OK, <<Key:?DCP_SIZES_PARTITION, Value:
180        ?DCP_SIZES_BY_SEQ, Rest/binary>>, Acc) ->
181    Acc2 = [{Key, Value} | Acc],
182    parse_all_seqs(?DCP_STATUS_OK, Rest, Acc2);
183parse_all_seqs(?DCP_STATUS_OK, <<>>, Acc) ->
184    {ok, lists:reverse(Acc)};
185parse_all_seqs(Status, Body, _Acc) ->
186    {error, {Status, Body}}.
187
188-spec encode_sasl_auth(binary(), binary(), request_id()) -> binary().
189encode_sasl_auth(User, Passwd, RequestId) ->
190    AuthType = <<"PLAIN">>,
191    Body = <<AuthType/binary, $\0,
192             User/binary, $\0,
193             Passwd/binary, $\0>>,
194
195    KeyLength = byte_size(AuthType),
196    BodyLength = byte_size(Body),
197    ExtraLength = 0,
198
199    Header = <<?DCP_MAGIC_REQUEST,
200               ?DCP_OPCODE_SASL_AUTH,
201               KeyLength:?DCP_SIZES_KEY_LENGTH,
202               ExtraLength,
203               0,
204               0:?DCP_SIZES_PARTITION,
205               BodyLength:?DCP_SIZES_BODY,
206               RequestId:?DCP_SIZES_OPAQUE,
207               0:?DCP_SIZES_CAS>>,
208    <<Header/binary, Body/binary>>.
209
210
211%DCP_SELECT_BUCKET command
212%Field        (offset) (value)
213%Magic        (0)    : 0x80
214%Opcode       (1)    : 0x50
215%Key length   (2,3)  : 0x0006
216%Extra length (4)    : 0x00
217%Data type    (5)    : 0x00
218%Vbucket      (6,7)  : 0x0000
219%Total body   (8-11) : 0x00000006
220%Opaque       (12-15): 0x00000001
221%CAS          (16-23): 0x0000000000000000
222%Key          (24-29): bucket
223
224encode_select_bucket(Bucket, RequestId) ->
225    KeyLength = byte_size(Bucket),
226    ExtraLength = 0,
227    Header = <<?DCP_MAGIC_REQUEST,
228               ?DCP_OPCODE_SELECT_BUCKET,
229               KeyLength:?DCP_SIZES_KEY_LENGTH,
230               ExtraLength,
231               0,
232               0:?DCP_SIZES_PARTITION,
233               KeyLength:?DCP_SIZES_BODY,
234               RequestId:?DCP_SIZES_OPAQUE,
235               0:?DCP_SIZES_CAS>>,
236    <<Header/binary, Bucket/binary>>.
237
238%DCP_OPEN command
239%Field        (offset) (value)
240%Magic        (0)    : 0x80
241%Opcode       (1)    : 0x50
242%Key length   (2,3)  : 0x0018
243%Extra length (4)    : 0x08
244%Data type    (5)    : 0x00
245%Vbucket      (6,7)  : 0x0000
246%Total body   (8-11) : 0x00000020
247%Opaque       (12-15): 0x00000001
248%CAS          (16-23): 0x0000000000000000
249%  seqno      (24-27): 0x00000000
250%  flags      (28-31): 0x00000000 (consumer)
251%Key          (32-55): bucketstream vb[100-105]
252-spec encode_open_connection(binary(), non_neg_integer(), request_id()) -> binary().
253encode_open_connection(Name, Flags, RequestId) ->
254    Body = <<0:?DCP_SIZES_SEQNO,
255            (Flags bor ?DCP_FLAG_PRODUCER):?DCP_SIZES_FLAGS,
256            Name/binary>>,
257
258    KeyLength = byte_size(Name),
259    BodyLength = byte_size(Body),
260    ExtraLength = BodyLength - KeyLength,
261
262    Header = <<?DCP_MAGIC_REQUEST,
263               ?DCP_OPCODE_OPEN_CONNECTION,
264               KeyLength:?DCP_SIZES_KEY_LENGTH,
265               ExtraLength,
266               0,
267               0:?DCP_SIZES_PARTITION,
268               BodyLength:?DCP_SIZES_BODY,
269               RequestId:?DCP_SIZES_OPAQUE,
270               0:?DCP_SIZES_CAS>>,
271    <<Header/binary, Body/binary>>.
272
273%DCP_STREAM_REQ command
274%Field                  (offset) (value)
275%Magic                  (0)    : 0x80
276%Opcode                 (1)    : 0x53
277%Key length             (2,3)  : 0x0000
278%Extra length           (4)    : 0x30
279%Data type              (5)    : 0x00
280%Vbucket                (6,7)  : 0x0000
281%Total body             (8-11) : 0x00000030
282%Opaque                 (12-15): 0x00001000
283%CAS                    (16-23): 0x0000000000000000
284%  flags                (24-27): 0x00000000
285%  reserved             (28-31): 0x00000000
286%  start seqno          (32-39): 0x0000000000ffeedd
287%  end seqno            (40-47): 0xffffffffffffffff
288%  vb UUID              (48-55): 0x00000000feeddeca
289%  snapshot start seqno (56-63): 0x0000000000000000
290%  snapshot end seqno   (64-71): 0x0000000000ffeeff
291-spec encode_stream_request(partition_id(), request_id(), non_neg_integer(),
292                            update_seq(), update_seq(),
293                            uuid(), update_seq(), update_seq()) -> binary().
294encode_stream_request(PartId, RequestId, Flags, StartSeq, EndSeq, PartUuid,
295        SnapshotStart, SnapshotEnd) ->
296    Body = <<Flags:?DCP_SIZES_FLAGS,
297             0:?DCP_SIZES_RESERVED,
298             StartSeq:?DCP_SIZES_BY_SEQ,
299             EndSeq:?DCP_SIZES_BY_SEQ,
300             PartUuid:?DCP_SIZES_PARTITION_UUID,
301             SnapshotStart:?DCP_SIZES_BY_SEQ,
302             SnapshotEnd:?DCP_SIZES_BY_SEQ>>,
303
304    BodyLength = byte_size(Body),
305    ExtraLength = BodyLength,
306
307    Header = <<?DCP_MAGIC_REQUEST,
308               ?DCP_OPCODE_STREAM_REQUEST,
309               0:?DCP_SIZES_KEY_LENGTH,
310               ExtraLength,
311               0,
312               PartId:?DCP_SIZES_PARTITION,
313               BodyLength:?DCP_SIZES_BODY,
314               RequestId:?DCP_SIZES_OPAQUE,
315               0:?DCP_SIZES_CAS>>,
316    <<Header/binary, Body/binary>>.
317
318
319%DCP_CLOSE_STREAM command
320%Field        (offset) (value)
321%Magic        (0)    : 0x80
322%Opcode       (1)    : 0x52
323%Key length   (2,3)  : 0x0000
324%Extra length (4)    : 0x00
325%Data type    (5)    : 0x00
326%Vbucket      (6,7)  : 0x0005
327%Total body   (8-11) : 0x00000000
328%Opaque       (12-15): 0xdeadbeef
329%CAS          (16-23): 0x0000000000000000
330encode_stream_close(PartId, RequestId) ->
331    Header = <<?DCP_MAGIC_REQUEST,
332               ?DCP_OPCODE_STREAM_CLOSE,
333               0:?DCP_SIZES_KEY_LENGTH,
334               0,
335               0,
336               PartId:?DCP_SIZES_PARTITION,
337               0:?DCP_SIZES_BODY,
338               RequestId:?DCP_SIZES_OPAQUE,
339               0:?DCP_SIZES_CAS>>,
340    Header.
341
342%DCP_GET_FAILOVER_LOG command
343%Field        (offset) (value)
344%Magic        (0)    : 0x80
345%Opcode       (1)    : 0x54
346%Key length   (2,3)  : 0x0000
347%Extra length (4)    : 0x00
348%Data type    (5)    : 0x00
349%Vbucket      (6,7)  : 0x0000
350%Total body   (8-11) : 0x00000000
351%Opaque       (12-15): 0xdeadbeef
352%CAS          (16-23): 0x0000000000000000
353-spec encode_failover_log_request(partition_id(), request_id()) -> binary().
354encode_failover_log_request(PartId, RequestId) ->
355    Header = <<?DCP_MAGIC_REQUEST,
356               ?DCP_OPCODE_FAILOVER_LOG_REQUEST,
357               0:?DCP_SIZES_KEY_LENGTH,
358               0,
359               0,
360               PartId:?DCP_SIZES_PARTITION,
361               0:?DCP_SIZES_BODY,
362               RequestId:?DCP_SIZES_OPAQUE,
363               0:?DCP_SIZES_CAS>>,
364    <<Header/binary>>.
365
366
367%Field        (offset) (value)
368%Magic        (0)    : 0x80
369%Opcode       (1)    : 0x10
370%Key length   (2,3)  : 0x000e
371%Extra length (4)    : 0x00
372%Data type    (5)    : 0x00
373%VBucket      (6,7)  : 0x0001
374%Total body   (8-11) : 0x0000000e
375%Opaque       (12-15): 0x00000000
376%CAS          (16-23): 0x0000000000000000
377%Key                 : vbucket-seqno 1
378-spec encode_stat_request(binary(), partition_id() | nil, request_id()) -> binary().
379encode_stat_request(Stat, PartId, RequestId) ->
380    case PartId of
381    nil ->
382        PartId2 = 0,
383        Body = Stat;
384    PartId ->
385        PartId2 = PartId,
386        Body = <<Stat/binary, " ",
387            (list_to_binary(integer_to_list(PartId)))/binary>>
388    end,
389    KeyLength = BodyLength = byte_size(Body),
390    ExtraLength = 0,
391
392    Header = <<?DCP_MAGIC_REQUEST,
393               ?DCP_OPCODE_STATS,
394               KeyLength:?DCP_SIZES_KEY_LENGTH,
395               ExtraLength,
396               0,
397               PartId2:?DCP_SIZES_PARTITION,
398               BodyLength:?DCP_SIZES_BODY,
399               RequestId:?DCP_SIZES_OPAQUE,
400               0:?DCP_SIZES_CAS>>,
401    <<Header/binary, Body/binary>>.
402
403%GET_ALL_VB_SEQNOS command
404%Field        (offset) (value)
405%Magic        (0)    : 0x80
406%Opcode       (1)    : 0x48
407%Key length   (2,3)  : 0x0000
408%Extra length (4)    : 0x00
409%Data type    (5)    : 0x00
410%VBucket      (6,7)  : 0x0000
411%Total body   (8-11) : 0x00000000
412%Opaque       (12-15): 0x00000000
413%CAS          (16-23): 0x0000000000000000
414encode_all_seqs_request(RequestId) ->
415    Header = <<?DCP_MAGIC_REQUEST,
416               ?DCP_OPCODE_SEQS,
417               0:?DCP_SIZES_KEY_LENGTH,
418               0,
419               0,
420               0:?DCP_SIZES_PARTITION,
421               0:?DCP_SIZES_BODY,
422               RequestId:?DCP_SIZES_OPAQUE,
423               0:?DCP_SIZES_CAS>>,
424    <<Header/binary>>.
425
426%DCP_CONTROL_BINARY_REQUEST command
427%Field        (offset) (value)
428%Magic        (0)    : 0x80
429%Opcode       (1)    : 0x5E
430%Key length   (2,3)  : 0x0016
431%Extra length (4)    : 0x00
432%Data type    (5)    : 0x00
433%VBucket      (6,7)  : 0x0000
434%Total body   (8-11) : 0x0000001a
435%Opaque       (12-15): 0x00000005
436%CAS          (16-23): 0x0000000000000000
437%Key                 : connection_buffer_size
438%Value               : 0x31303234
439-spec encode_control_request(request_id(), connection | stream, integer())
440                                                                -> binary().
441encode_control_request(RequestId, Type, BufferSize) ->
442    Key = case Type of
443    connection ->
444        <<"connection_buffer_size">>
445    end,
446    BufferSize2 = list_to_binary(integer_to_list(BufferSize)),
447    Body = <<Key/binary, BufferSize2/binary>>,
448
449    KeyLength =  byte_size(Key),
450    BodyLength = byte_size(Body),
451    ExtraLength = 0,
452
453    Header = <<?DCP_MAGIC_REQUEST,
454               ?DCP_OPCODE_DCP_CONTROL,
455               KeyLength:?DCP_SIZES_KEY_LENGTH,
456               ExtraLength,
457               0,
458               0:?DCP_SIZES_PARTITION,
459               BodyLength:?DCP_SIZES_BODY,
460               RequestId:?DCP_SIZES_OPAQUE,
461               0:?DCP_SIZES_CAS>>,
462    <<Header/binary, Body/binary>>.
463
464%DCP_BUFFER_ACK_REQUEST command
465%Field        (offset) (value)
466%Magic        (0)    : 0x80
467%Opcode       (1)    : 0x5D
468%Key length   (2,3)  : 0x0000
469%Extra length (4)    : 0x04
470%Data type    (5)    : 0x00
471%VBucket      (6,7)  : 0x0000
472%Total body   (8-11) : 0x00000004
473%Opaque       (12-15): 0x00000000
474%CAS          (16-23): 0x0000000000000000
475%BufferSize   (24-27): 0x00001000
476-spec encode_buffer_request(request_id(), size()) -> binary().
477encode_buffer_request(RequestId, BufferSize) ->
478    Extra = <<BufferSize:?DCP_SIZES_BUFFER_SIZE>>,
479    BodyLength = ExtraLength = byte_size(Extra),
480
481    Header = <<?DCP_MAGIC_REQUEST,
482               ?DCP_OPCODE_DCP_BUFFER,
483               0:?DCP_SIZES_KEY_LENGTH,
484               ExtraLength,
485               0,
486               0:?DCP_SIZES_PARTITION,
487               BodyLength:?DCP_SIZES_BODY,
488               RequestId:?DCP_SIZES_OPAQUE,
489               0:?DCP_SIZES_CAS>>,
490    <<Header/binary, Extra/binary>>.
491
492%DCP_NOOP response
493%Field        (offset) (value)
494%Magic        (0)    : 0x81
495%Opcode       (1)    : 0x5C
496%Key length   (2,3)  : 0x0000
497%Extra length (4)    : 0x00
498%Data type    (5)    : 0x00
499%VBucket      (6,7)  : 0x0000
500%Total body   (8-11) : 0x00000000
501%Opaque       (12-15): 0x00000005
502%CAS          (16-23): 0x0000000000000000
503
504-spec encode_noop_response(request_id()) -> binary().
505encode_noop_response(RequestId) ->
506    <<?DCP_MAGIC_RESPONSE,
507      ?DCP_OPCODE_DCP_NOOP,
508      0:?DCP_SIZES_KEY_LENGTH,
509      0,
510      0,
511      0:?DCP_SIZES_PARTITION,
512      0:?DCP_SIZES_BODY,
513      RequestId:?DCP_SIZES_OPAQUE,
514      0:?DCP_SIZES_CAS>>.
515