1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13% This module is for parsing and encoding all the DCP commands that are needed
14% by the indexer.
15-module(couch_dcp_consumer).
16
17-export([parse_header/1, parse_snapshot_marker/1, parse_snapshot_mutation/4,
18    parse_snapshot_deletion/2, parse_failover_log/1, parse_stat/4]).
19-export([encode_sasl_auth/3, encode_open_connection/3, encode_stream_request/8,
20    encode_failover_log_request/2, encode_stat_request/3, encode_stream_close/2,
21    encode_select_bucket/2]).
22-export([encode_noop_response/1, encode_buffer_request/2,
23    encode_control_request/3, parse_all_seqs/3, encode_all_seqs_request/1]).
24
25-include_lib("couch_dcp/include/couch_dcp.hrl").
26-include_lib("couch_dcp/include/couch_dcp_typespecs.hrl").
27
28
29% TODO vmx 2013-08-22: Bad match error handling
30-spec parse_header(<<_:192>>) ->
31                          {atom(), size()} |
32                          {atom(), dcp_status(), request_id(), size()} |
33                          {atom(), dcp_status(), request_id()} |
34                          {atom(), dcp_status(), request_id(), size(),
35                           size()} |
36                          {atom(), partition_id(), request_id(), size()} |
37                          {atom(), partition_id(), request_id(), size(),
38                           size()} |
39                          {atom(), partition_id(), request_id(), size(),
40                           size(), uint64()} |
41                          {atom(), partition_id(), request_id(), size(),
42                           size(), size(), uint64()}.
43parse_header(<<?DCP_MAGIC_RESPONSE,
44               Opcode,
45               KeyLength:?DCP_SIZES_KEY_LENGTH,
46               _ExtraLength,
47               0,
48               Status:?DCP_SIZES_STATUS,
49               BodyLength:?DCP_SIZES_BODY,
50               RequestId:?DCP_SIZES_OPAQUE,
51               _Cas:?DCP_SIZES_CAS>>) ->
52    case Opcode of
53    ?DCP_OPCODE_STREAM_REQUEST ->
54        {stream_request, Status, RequestId, BodyLength};
55    ?DCP_OPCODE_OPEN_CONNECTION ->
56        {open_connection, RequestId};
57    ?DCP_OPCODE_FAILOVER_LOG_REQUEST ->
58        {failover_log, Status, RequestId, BodyLength};
59    ?DCP_OPCODE_STATS ->
60        {stats, Status, RequestId, BodyLength, KeyLength};
61    ?DCP_OPCODE_SASL_AUTH ->
62        {sasl_auth, Status, RequestId, BodyLength};
63    ?DCP_OPCODE_STREAM_CLOSE ->
64        {stream_close, Status, RequestId, BodyLength};
65    ?DCP_OPCODE_SELECT_BUCKET ->
66        {select_bucket, Status, RequestId, BodyLength};
67    ?DCP_OPCODE_DCP_BUFFER ->
68        {buffer_ack, Status, RequestId};
69    ?DCP_OPCODE_DCP_CONTROL ->
70        {control_request, Status, RequestId};
71    ?DCP_OPCODE_SEQS ->
72        {all_seqs, Status, RequestId, BodyLength}
73    end;
74parse_header(<<?DCP_MAGIC_REQUEST,
75               Opcode,
76               KeyLength:?DCP_SIZES_KEY_LENGTH,
77               ExtraLength,
78               DataType,
79               PartId:?DCP_SIZES_PARTITION,
80               BodyLength:?DCP_SIZES_BODY,
81               RequestId:?DCP_SIZES_OPAQUE,
82               Cas:?DCP_SIZES_CAS>>) ->
83    case Opcode of
84    ?DCP_OPCODE_STREAM_END ->
85        {stream_end, PartId, RequestId, BodyLength};
86    ?DCP_OPCODE_SNAPSHOT_MARKER ->
87        {snapshot_marker, PartId, RequestId, BodyLength};
88    ?DCP_OPCODE_MUTATION ->
89        {snapshot_mutation, PartId, RequestId, KeyLength, BodyLength,
90            ExtraLength, Cas, DataType};
91    ?DCP_OPCODE_DELETION ->
92        {snapshot_deletion, PartId, RequestId, KeyLength, BodyLength, Cas,
93            DataType};
94    ?DCP_OPCODE_EXPIRATION ->
95        {snapshot_expiration, PartId, RequestId, KeyLength, BodyLength, Cas,
96            DataType};
97    ?DCP_OPCODE_DCP_NOOP ->
98        {noop_request, RequestId}
99    end.
100
101-spec parse_snapshot_marker(<<_:160>>) ->
102                                   {snapshot_marker, update_seq(),
103                                    update_seq(), non_neg_integer()}.
104parse_snapshot_marker(Body) ->
105    <<StartSeq:?DCP_SIZES_BY_SEQ,
106      EndSeq:?DCP_SIZES_BY_SEQ,
107      Type:?DCP_SIZES_SNAPSHOT_TYPE>> = Body,
108    {snapshot_marker, StartSeq, EndSeq, Type}.
109
110
111-spec parse_snapshot_mutation(size(), binary(), size(), size()) ->
112                                     {snapshot_mutation, #mutation{}}.
113parse_snapshot_mutation(KeyLength, Body, BodyLength, ExtraLength) ->
114    <<Seq:?DCP_SIZES_BY_SEQ,
115      RevSeq:?DCP_SIZES_REV_SEQ,
116      Flags:?DCP_SIZES_FLAGS,
117      Expiration:?DCP_SIZES_EXPIRATION,
118      LockTime:?DCP_SIZES_LOCK,
119      MetadataLength:?DCP_SIZES_METADATA_LENGTH,
120      _Nru:?DCP_SIZES_NRU_LENGTH,
121      Key:KeyLength/binary,
122      Rest/binary>> = Body,
123    ValueLength = BodyLength - ExtraLength - KeyLength - MetadataLength,
124    <<Value:ValueLength/binary,
125      Metadata:MetadataLength/binary>> = Rest,
126    {snapshot_mutation, #mutation{
127        seq = Seq,
128        rev_seq = RevSeq,
129        flags = Flags,
130        expiration = Expiration,
131        locktime = LockTime,
132        key = Key,
133        value = Value,
134        metadata = Metadata
135    }}.
136
137-spec parse_snapshot_deletion(size(), binary()) ->
138                                     {snapshot_deletion,
139                                      {update_seq(), non_neg_integer(),
140                                       binary(), binary()}}.
141parse_snapshot_deletion(KeyLength, Body) ->
142    % XXX vmx 2014-01-07: No metadata support for now. Make it so it breaks
143    % once it's there.
144    MetadataLength = 0,
145    <<Seq:?DCP_SIZES_BY_SEQ,
146      RevSeq:?DCP_SIZES_REV_SEQ,
147      MetadataLength:?DCP_SIZES_METADATA_LENGTH,
148      Key:KeyLength/binary,
149      Metadata:MetadataLength/binary>> = Body,
150    {snapshot_deletion, {Seq, RevSeq, Key, Metadata}}.
151
152
153-spec parse_failover_log(binary(), partition_version()) ->
154                                {ok, partition_version()}.
155parse_failover_log(Body) ->
156    parse_failover_log(Body, []).
157parse_failover_log(<<>>, Acc) ->
158    {ok, lists:reverse(Acc)};
159parse_failover_log(<<PartUuid:?DCP_SIZES_PARTITION_UUID,
160                     PartSeq:?DCP_SIZES_BY_SEQ,
161                     Rest/binary>>,
162                   Acc) ->
163    parse_failover_log(Rest, [{PartUuid, PartSeq}|Acc]).
164
165
166-spec parse_stat(binary(), dcp_status(), size(), size()) ->
167                        {ok, {binary(), binary()}} |
168                        {error, {dcp_status(), binary()}}.
169parse_stat(Body, Status, 0, _ValueLength) ->
170    {error, {Status, Body}};
171parse_stat(Body, ?DCP_STATUS_OK, KeyLength, ValueLength) ->
172    <<Key:KeyLength/binary, Value:ValueLength/binary>> = Body,
173    {ok, {Key, Value}}.
174
175-spec parse_all_seqs(dcp_status(), binary(), list()) ->
176                        {ok, list()} |
177                        {error, {dcp_status(), binary()}}.
178parse_all_seqs(?DCP_STATUS_OK, <<Key:?DCP_SIZES_PARTITION, Value:
179        ?DCP_SIZES_BY_SEQ, Rest/binary>>, Acc) ->
180    Acc2 = [{Key, Value} | Acc],
181    parse_all_seqs(?DCP_STATUS_OK, Rest, Acc2);
182parse_all_seqs(?DCP_STATUS_OK, <<>>, Acc) ->
183    {ok, lists:reverse(Acc)};
184parse_all_seqs(Status, Body, _Acc) ->
185    {error, {Status, Body}}.
186
187-spec encode_sasl_auth(binary(), binary(), request_id()) -> binary().
188encode_sasl_auth(User, Passwd, RequestId) ->
189    AuthType = <<"PLAIN">>,
190    Body = <<AuthType/binary, $\0,
191             User/binary, $\0,
192             Passwd/binary, $\0>>,
193
194    KeyLength = byte_size(AuthType),
195    BodyLength = byte_size(Body),
196    ExtraLength = 0,
197
198    Header = <<?DCP_MAGIC_REQUEST,
199               ?DCP_OPCODE_SASL_AUTH,
200               KeyLength:?DCP_SIZES_KEY_LENGTH,
201               ExtraLength,
202               0,
203               0:?DCP_SIZES_PARTITION,
204               BodyLength:?DCP_SIZES_BODY,
205               RequestId:?DCP_SIZES_OPAQUE,
206               0:?DCP_SIZES_CAS>>,
207    <<Header/binary, Body/binary>>.
208
209
210%DCP_SELECT_BUCKET command
211%Field        (offset) (value)
212%Magic        (0)    : 0x80
213%Opcode       (1)    : 0x50
214%Key length   (2,3)  : 0x0006
215%Extra length (4)    : 0x00
216%Data type    (5)    : 0x00
217%Vbucket      (6,7)  : 0x0000
218%Total body   (8-11) : 0x00000006
219%Opaque       (12-15): 0x00000001
220%CAS          (16-23): 0x0000000000000000
221%Key          (24-29): bucket
222
223encode_select_bucket(Bucket, RequestId) ->
224    KeyLength = byte_size(Bucket),
225    ExtraLength = 0,
226    Header = <<?DCP_MAGIC_REQUEST,
227               ?DCP_OPCODE_SELECT_BUCKET,
228               KeyLength:?DCP_SIZES_KEY_LENGTH,
229               ExtraLength,
230               0,
231               0:?DCP_SIZES_PARTITION,
232               KeyLength:?DCP_SIZES_BODY,
233               RequestId:?DCP_SIZES_OPAQUE,
234               0:?DCP_SIZES_CAS>>,
235    <<Header/binary, Bucket/binary>>.
236
237%DCP_OPEN command
238%Field        (offset) (value)
239%Magic        (0)    : 0x80
240%Opcode       (1)    : 0x50
241%Key length   (2,3)  : 0x0018
242%Extra length (4)    : 0x08
243%Data type    (5)    : 0x00
244%Vbucket      (6,7)  : 0x0000
245%Total body   (8-11) : 0x00000020
246%Opaque       (12-15): 0x00000001
247%CAS          (16-23): 0x0000000000000000
248%  seqno      (24-27): 0x00000000
249%  flags      (28-31): 0x00000000 (consumer)
250%Key          (32-55): bucketstream vb[100-105]
251-spec encode_open_connection(binary(), non_neg_integer(), request_id()) -> binary().
252encode_open_connection(Name, Flags, RequestId) ->
253    Body = <<0:?DCP_SIZES_SEQNO,
254            (Flags bor ?DCP_FLAG_PRODUCER):?DCP_SIZES_FLAGS,
255            Name/binary>>,
256
257    KeyLength = byte_size(Name),
258    BodyLength = byte_size(Body),
259    ExtraLength = BodyLength - KeyLength,
260
261    Header = <<?DCP_MAGIC_REQUEST,
262               ?DCP_OPCODE_OPEN_CONNECTION,
263               KeyLength:?DCP_SIZES_KEY_LENGTH,
264               ExtraLength,
265               0,
266               0:?DCP_SIZES_PARTITION,
267               BodyLength:?DCP_SIZES_BODY,
268               RequestId:?DCP_SIZES_OPAQUE,
269               0:?DCP_SIZES_CAS>>,
270    <<Header/binary, Body/binary>>.
271
272%DCP_STREAM_REQ command
273%Field                  (offset) (value)
274%Magic                  (0)    : 0x80
275%Opcode                 (1)    : 0x53
276%Key length             (2,3)  : 0x0000
277%Extra length           (4)    : 0x30
278%Data type              (5)    : 0x00
279%Vbucket                (6,7)  : 0x0000
280%Total body             (8-11) : 0x00000030
281%Opaque                 (12-15): 0x00001000
282%CAS                    (16-23): 0x0000000000000000
283%  flags                (24-27): 0x00000000
284%  reserved             (28-31): 0x00000000
285%  start seqno          (32-39): 0x0000000000ffeedd
286%  end seqno            (40-47): 0xffffffffffffffff
287%  vb UUID              (48-55): 0x00000000feeddeca
288%  snapshot start seqno (56-63): 0x0000000000000000
289%  snapshot end seqno   (64-71): 0x0000000000ffeeff
290-spec encode_stream_request(partition_id(), request_id(), non_neg_integer(),
291                            update_seq(), update_seq(),
292                            uuid(), update_seq(), update_seq()) -> binary().
293encode_stream_request(PartId, RequestId, Flags, StartSeq, EndSeq, PartUuid,
294        SnapshotStart, SnapshotEnd) ->
295    Body = <<Flags:?DCP_SIZES_FLAGS,
296             0:?DCP_SIZES_RESERVED,
297             StartSeq:?DCP_SIZES_BY_SEQ,
298             EndSeq:?DCP_SIZES_BY_SEQ,
299             PartUuid:?DCP_SIZES_PARTITION_UUID,
300             SnapshotStart:?DCP_SIZES_BY_SEQ,
301             SnapshotEnd:?DCP_SIZES_BY_SEQ>>,
302
303    BodyLength = byte_size(Body),
304    ExtraLength = BodyLength,
305
306    Header = <<?DCP_MAGIC_REQUEST,
307               ?DCP_OPCODE_STREAM_REQUEST,
308               0:?DCP_SIZES_KEY_LENGTH,
309               ExtraLength,
310               0,
311               PartId:?DCP_SIZES_PARTITION,
312               BodyLength:?DCP_SIZES_BODY,
313               RequestId:?DCP_SIZES_OPAQUE,
314               0:?DCP_SIZES_CAS>>,
315    <<Header/binary, Body/binary>>.
316
317
318%DCP_CLOSE_STREAM command
319%Field        (offset) (value)
320%Magic        (0)    : 0x80
321%Opcode       (1)    : 0x52
322%Key length   (2,3)  : 0x0000
323%Extra length (4)    : 0x00
324%Data type    (5)    : 0x00
325%Vbucket      (6,7)  : 0x0005
326%Total body   (8-11) : 0x00000000
327%Opaque       (12-15): 0xdeadbeef
328%CAS          (16-23): 0x0000000000000000
329encode_stream_close(PartId, RequestId) ->
330    Header = <<?DCP_MAGIC_REQUEST,
331               ?DCP_OPCODE_STREAM_CLOSE,
332               0:?DCP_SIZES_KEY_LENGTH,
333               0,
334               0,
335               PartId:?DCP_SIZES_PARTITION,
336               0:?DCP_SIZES_BODY,
337               RequestId:?DCP_SIZES_OPAQUE,
338               0:?DCP_SIZES_CAS>>,
339    Header.
340
341%DCP_GET_FAILOVER_LOG command
342%Field        (offset) (value)
343%Magic        (0)    : 0x80
344%Opcode       (1)    : 0x54
345%Key length   (2,3)  : 0x0000
346%Extra length (4)    : 0x00
347%Data type    (5)    : 0x00
348%Vbucket      (6,7)  : 0x0000
349%Total body   (8-11) : 0x00000000
350%Opaque       (12-15): 0xdeadbeef
351%CAS          (16-23): 0x0000000000000000
352-spec encode_failover_log_request(partition_id(), request_id()) -> binary().
353encode_failover_log_request(PartId, RequestId) ->
354    Header = <<?DCP_MAGIC_REQUEST,
355               ?DCP_OPCODE_FAILOVER_LOG_REQUEST,
356               0:?DCP_SIZES_KEY_LENGTH,
357               0,
358               0,
359               PartId:?DCP_SIZES_PARTITION,
360               0:?DCP_SIZES_BODY,
361               RequestId:?DCP_SIZES_OPAQUE,
362               0:?DCP_SIZES_CAS>>,
363    <<Header/binary>>.
364
365
366%Field        (offset) (value)
367%Magic        (0)    : 0x80
368%Opcode       (1)    : 0x10
369%Key length   (2,3)  : 0x000e
370%Extra length (4)    : 0x00
371%Data type    (5)    : 0x00
372%VBucket      (6,7)  : 0x0001
373%Total body   (8-11) : 0x0000000e
374%Opaque       (12-15): 0x00000000
375%CAS          (16-23): 0x0000000000000000
376%Key                 : vbucket-seqno 1
377-spec encode_stat_request(binary(), partition_id() | nil, request_id()) -> binary().
378encode_stat_request(Stat, PartId, RequestId) ->
379    case PartId of
380    nil ->
381        PartId2 = 0,
382        Body = Stat;
383    PartId ->
384        PartId2 = PartId,
385        Body = <<Stat/binary, " ",
386            (list_to_binary(integer_to_list(PartId)))/binary>>
387    end,
388    KeyLength = BodyLength = byte_size(Body),
389    ExtraLength = 0,
390
391    Header = <<?DCP_MAGIC_REQUEST,
392               ?DCP_OPCODE_STATS,
393               KeyLength:?DCP_SIZES_KEY_LENGTH,
394               ExtraLength,
395               0,
396               PartId2:?DCP_SIZES_PARTITION,
397               BodyLength:?DCP_SIZES_BODY,
398               RequestId:?DCP_SIZES_OPAQUE,
399               0:?DCP_SIZES_CAS>>,
400    <<Header/binary, Body/binary>>.
401
402%GET_ALL_VB_SEQNOS command
403%Field        (offset) (value)
404%Magic        (0)    : 0x80
405%Opcode       (1)    : 0x48
406%Key length   (2,3)  : 0x0000
407%Extra length (4)    : 0x00
408%Data type    (5)    : 0x00
409%VBucket      (6,7)  : 0x0000
410%Total body   (8-11) : 0x00000000
411%Opaque       (12-15): 0x00000000
412%CAS          (16-23): 0x0000000000000000
413encode_all_seqs_request(RequestId) ->
414    Header = <<?DCP_MAGIC_REQUEST,
415               ?DCP_OPCODE_SEQS,
416               0:?DCP_SIZES_KEY_LENGTH,
417               0,
418               0,
419               0:?DCP_SIZES_PARTITION,
420               0:?DCP_SIZES_BODY,
421               RequestId:?DCP_SIZES_OPAQUE,
422               0:?DCP_SIZES_CAS>>,
423    <<Header/binary>>.
424
425%DCP_CONTROL_BINARY_REQUEST command
426%Field        (offset) (value)
427%Magic        (0)    : 0x80
428%Opcode       (1)    : 0x5E
429%Key length   (2,3)  : 0x0016
430%Extra length (4)    : 0x00
431%Data type    (5)    : 0x00
432%VBucket      (6,7)  : 0x0000
433%Total body   (8-11) : 0x0000001a
434%Opaque       (12-15): 0x00000005
435%CAS          (16-23): 0x0000000000000000
436%Key                 : connection_buffer_size
437%Value               : 0x31303234
438-spec encode_control_request(request_id(), connection | stream, integer())
439                                                                -> binary().
440encode_control_request(RequestId, Type, BufferSize) ->
441    Key = case Type of
442    connection ->
443        <<"connection_buffer_size">>
444    end,
445    BufferSize2 = list_to_binary(integer_to_list(BufferSize)),
446    Body = <<Key/binary, BufferSize2/binary>>,
447
448    KeyLength =  byte_size(Key),
449    BodyLength = byte_size(Body),
450    ExtraLength = 0,
451
452    Header = <<?DCP_MAGIC_REQUEST,
453               ?DCP_OPCODE_DCP_CONTROL,
454               KeyLength:?DCP_SIZES_KEY_LENGTH,
455               ExtraLength,
456               0,
457               0:?DCP_SIZES_PARTITION,
458               BodyLength:?DCP_SIZES_BODY,
459               RequestId:?DCP_SIZES_OPAQUE,
460               0:?DCP_SIZES_CAS>>,
461    <<Header/binary, Body/binary>>.
462
463%DCP_BUFFER_ACK_REQUEST command
464%Field        (offset) (value)
465%Magic        (0)    : 0x80
466%Opcode       (1)    : 0x5D
467%Key length   (2,3)  : 0x0000
468%Extra length (4)    : 0x04
469%Data type    (5)    : 0x00
470%VBucket      (6,7)  : 0x0000
471%Total body   (8-11) : 0x00000004
472%Opaque       (12-15): 0x00000000
473%CAS          (16-23): 0x0000000000000000
474%BufferSize   (24-27): 0x00001000
475-spec encode_buffer_request(request_id(), size()) -> binary().
476encode_buffer_request(RequestId, BufferSize) ->
477    Extra = <<BufferSize:?DCP_SIZES_BUFFER_SIZE>>,
478    BodyLength = ExtraLength = byte_size(Extra),
479
480    Header = <<?DCP_MAGIC_REQUEST,
481               ?DCP_OPCODE_DCP_BUFFER,
482               0:?DCP_SIZES_KEY_LENGTH,
483               ExtraLength,
484               0,
485               0:?DCP_SIZES_PARTITION,
486               BodyLength:?DCP_SIZES_BODY,
487               RequestId:?DCP_SIZES_OPAQUE,
488               0:?DCP_SIZES_CAS>>,
489    <<Header/binary, Extra/binary>>.
490
491%DCP_NOOP response
492%Field        (offset) (value)
493%Magic        (0)    : 0x81
494%Opcode       (1)    : 0x5C
495%Key length   (2,3)  : 0x0000
496%Extra length (4)    : 0x00
497%Data type    (5)    : 0x00
498%VBucket      (6,7)  : 0x0000
499%Total body   (8-11) : 0x00000000
500%Opaque       (12-15): 0x00000005
501%CAS          (16-23): 0x0000000000000000
502
503-spec encode_noop_response(request_id()) -> binary().
504encode_noop_response(RequestId) ->
505    <<?DCP_MAGIC_RESPONSE,
506      ?DCP_OPCODE_DCP_NOOP,
507      0:?DCP_SIZES_KEY_LENGTH,
508      0,
509      0,
510      0:?DCP_SIZES_PARTITION,
511      0:?DCP_SIZES_BODY,
512      RequestId:?DCP_SIZES_OPAQUE,
513      0:?DCP_SIZES_CAS>>.
514