1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_dcp_client).
14-behaviour(gen_server).
15
16% Public API
17-export([start/6]).
18-export([add_stream/6, get_seqs/2, get_num_items/2,
19    get_failover_log/2]).
20-export([get_stream_event/2, remove_stream/2, list_streams/1]).
21-export([enum_docs_since/8, restart_worker/1]).
22-export([get_seqs_async/1]).
23
24% gen_server callbacks
25-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
26-export([format_status/2]).
27
28-include("couch_db.hrl").
29-include_lib("couch_dcp/include/couch_dcp.hrl").
30-include_lib("couch_dcp/include/couch_dcp_typespecs.hrl").
31-define(MAX_BUF_SIZE, 10485760).
32-define(TIMEOUT, 60000).
33-define(TIMEOUT_STATS, 2000).
34-define(DCP_RETRY_TIMEOUT, 2000).
35-define(DCP_BUF_WINDOW, 65535).
36-define(SOCKET(BufSocket), BufSocket#bufsocket.sockpid).
37
38-type mutations_fold_fun() :: fun().
39-type mutations_fold_acc() :: any().
40
41-record(bufsocket, {
42    sockpid = nil                   :: socket() | nil,
43    sockbuf = <<>>                  :: binary()
44}).
45
46-record(state, {
47    bufsocket = nil                 :: #bufsocket{} | nil,
48    timeout = 5000                  :: timeout(),
49    request_id = 0                  :: request_id(),
50    pending_requests = dict:new()   :: dict(),
51    stream_queues = dict:new()      :: dict(),
52    active_streams = []             :: list(),
53    worker_pid                      :: pid(),
54    max_buffer_size = ?MAX_BUF_SIZE :: integer(),
55    total_buffer_size = 0           :: non_neg_integer(),
56    stream_info = dict:new()        :: dict(),
57    args = []                       :: list()
58}).
59
60-record(stream_info, {
61    part_id = 0                     :: partition_id(),
62    part_uuid = 0                   :: uuid(),
63    start_seq = 0                   :: update_seq(),
64    end_seq = 0                     :: update_seq(),
65    snapshot_seq = {0, 0}           :: {update_seq(), update_seq()},
66    flags = 0                       :: non_neg_integer()
67}).
68
69% This gen server implements a DCP client with vbucket stream multiplexing
70% The client spawns a worker process to handle all response messages and event
71% messages received from the DCP server. For easiness, responses are classifed into
72% two types of messages, stream_response and stream_event. For any DCP request, the
73% corresponding response message is called stream_response. But a vbucket stream request
74% is like a subscribe mechanism. After the stream initated response arrives, it will start
75% sending vbucket events. This type of messages are called as stream_event.
76%
77% The user can request for a stream using add_stream() API and corresponding events can
78% be received by using get_stream_event() API.
79
80
81% Public API
82
83-spec start(binary(), binary(), binary(), binary(), non_neg_integer(), non_neg_integer()) ->
84    {ok, pid()} | ignore | {error, {already_started, pid()} | term()}.
85start(Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags) ->
86    gen_server:start_link(?MODULE, [Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags], []).
87
88-spec add_stream(pid(), partition_id(), uuid(), update_seq(),
89    update_seq(), dcp_data_type()) -> {error, term()} | {request_id(), term()}.
90add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
91    gen_server:call(
92        Pid, {add_stream, PartId, PartUuid, StartSeq, EndSeq, Flags}, ?TIMEOUT).
93
94
95-spec remove_stream(pid(), partition_id()) ->
96                            'ok' | {'error', term()}.
97remove_stream(Pid, PartId) ->
98    gen_server:call(Pid, {remove_stream, PartId}, ?TIMEOUT).
99
100
101-spec list_streams(pid()) -> list().
102list_streams(Pid) ->
103    gen_server:call(Pid, list_streams).
104
105
106-spec get_stats_reply(pid(), reference()) -> term().
107get_stats_reply(Pid, MRef) ->
108    receive
109    {get_stats, MRef, Reply} ->
110        Reply;
111    {'DOWN', MRef, process, Pid, Reason} ->
112        exit({dcp_client_died, Pid, Reason})
113    after ?TIMEOUT_STATS ->
114        ?LOG_ERROR("dcp client (~p): stats timed out after ~p seconds."
115                   " Waiting...",
116            [Pid, ?TIMEOUT_STATS / 1000]),
117        get_stats_reply(Pid, MRef)
118    end.
119
120-spec get_all_seqs_reply(pid(), reference()) -> term().
121get_all_seqs_reply(Pid, MRef) ->
122    receive
123    {all_seqs, MRef, Reply} ->
124        Reply;
125    {'DOWN', MRef, process, Pid, Reason} ->
126        exit({dcp_client_died, Pid, Reason})
127    after ?TIMEOUT_STATS ->
128        ?LOG_ERROR("dcp client (~p): dcp all seqs timed out after ~p seconds."
129                   " Waiting...",
130            [Pid, ?TIMEOUT_STATS / 1000]),
131        get_all_seqs_reply(Pid, MRef)
132    end.
133
134-spec get_stats(pid(), binary(), partition_id() | nil) -> term().
135get_stats(Pid, Name, PartId) ->
136    MRef = erlang:monitor(process, Pid),
137    Pid ! {get_stats, Name, PartId, {MRef, self()}},
138    Reply = get_stats_reply(Pid, MRef),
139    erlang:demonitor(MRef, [flush]),
140    Reply.
141
142
143% The async get_seqs API requests seq numbers asynchronously.
144% The response will be sent to the caller process asynchronously in the
145% following message format:
146% {all_seqs, nil, StatsResponse}.
147-spec get_seqs_async(pid()) -> ok.
148get_seqs_async(Pid) ->
149    Pid ! {get_all_seqs, {nil, self()}},
150    ok.
151
152-spec get_seqs(pid(), ordsets:ordset(partition_id()) | nil) ->
153         {ok, partition_seqs()} | {error, term()}.
154get_seqs(Pid, SortedPartIds) ->
155    case get_all_seqs(Pid) of
156    {ok, Seqs} ->
157        case SortedPartIds of
158        nil ->
159            {ok, Seqs};
160        _ ->
161            Seqs2 = couch_set_view_util:filter_seqs(SortedPartIds, Seqs),
162            {ok, Seqs2}
163        end;
164    {error, _} = Error ->
165        Error
166    end.
167
168-spec get_all_seqs(pid()) -> term().
169get_all_seqs(Pid) ->
170    MRef = erlang:monitor(process, Pid),
171    Pid ! {get_all_seqs, {MRef, self()}},
172    Reply = get_all_seqs_reply(Pid, MRef),
173    erlang:demonitor(MRef, [flush]),
174    Reply.
175
176-spec get_num_items(pid(), partition_id()) ->
177                           {ok, non_neg_integer()} | {error, not_my_vbucket}.
178get_num_items(Pid, PartId) ->
179    Reply = get_stats(Pid, <<"vbucket-details">>, PartId),
180    case Reply of
181    {ok, Stats} ->
182        BinPartId = list_to_binary(integer_to_list(PartId)),
183        {_, NumItems} = lists:keyfind(
184            <<"vb_", BinPartId/binary, ":num_items">>, 1, Stats),
185        {ok, list_to_integer(binary_to_list(NumItems))};
186    {error, {?DCP_STATUS_NOT_MY_VBUCKET, _}} ->
187        {error, not_my_vbucket}
188    end.
189
190
191-spec get_failover_log(pid(), partition_id()) ->
192                              {error, no_failover_log_found | dcp_status()} |
193                              {ok, partition_version()}.
194get_failover_log(Pid, PartId) ->
195    gen_server:call(Pid, {get_failover_log, PartId}).
196
197
198-spec get_stream_event(pid(), request_id()) ->
199                              {atom(), #dcp_doc{}} | {'error', term()}.
200get_stream_event(Pid, ReqId) ->
201    MRef = erlang:monitor(process, Pid),
202    Pid ! {get_stream_event, ReqId, self()},
203    Reply = get_stream_event_get_reply(Pid, ReqId, MRef),
204    erlang:demonitor(MRef, [flush]),
205    Reply.
206
207get_stream_event_get_reply(Pid, ReqId, MRef) ->
208    receive
209    {stream_event, ReqId, Reply} ->
210        Reply;
211    {'DOWN', MRef, process, Pid, Reason} ->
212        exit({dcp_client_died, Pid, Reason})
213    after ?TIMEOUT ->
214        Msg = {print_log, ReqId},
215        Pid ! Msg,
216        get_stream_event_get_reply(Pid, ReqId, MRef)
217    end.
218
219-spec enum_docs_since(pid(), partition_id(), partition_version(), update_seq(),
220                      update_seq(), 0..255, mutations_fold_fun(),
221                      mutations_fold_acc()) ->
222                             {error, vbucket_stream_not_found |
223                              wrong_start_sequence_number |
224                              too_large_failover_log } |
225                             {rollback, update_seq()} |
226                             {ok, mutations_fold_acc(), partition_version()}.
227enum_docs_since(_, _, [], _, _, _, _, _) ->
228    % No matching partition version found. Recreate the index from scratch
229    {rollback, 0};
230enum_docs_since(Pid, PartId, PartVersions, StartSeq, EndSeq0, Flags,
231        CallbackFn, InAcc) ->
232    [PartVersion | PartVersionsRest] = PartVersions,
233    {PartUuid, _} = PartVersion,
234    EndSeq = case EndSeq0 < StartSeq of
235    true ->
236        ?LOG_INFO("dcp client (~p): Expecting a rollback for partition ~p. "
237        "Found start_seqno > end_seqno (~p > ~p).",
238        [Pid, PartId, StartSeq, EndSeq0]),
239        StartSeq;
240    false ->
241        EndSeq0
242    end,
243
244    case add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) of
245    {error, _} = Error ->
246        Error;
247    {RequestId, Resp} ->
248        case Resp of
249        {failoverlog, FailoverLog} ->
250            case length(FailoverLog) > ?DCP_MAX_FAILOVER_LOG_SIZE of
251            true ->
252                {error, too_large_failover_log};
253            false ->
254                InAcc2 = CallbackFn({part_versions, {PartId, FailoverLog}}, InAcc),
255                case receive_events(Pid, RequestId, CallbackFn, InAcc2) of
256                {ok, InAcc3} ->
257                    {ok, InAcc3, FailoverLog};
258                Error ->
259                    Error
260                end
261            end;
262        {error, vbucket_stream_not_found} ->
263            enum_docs_since(Pid, PartId, PartVersionsRest, StartSeq, EndSeq,
264                Flags, CallbackFn, InAcc);
265        {error, vbucket_stream_tmp_fail} ->
266            ?LOG_INFO("dcp client (~p): Temporary failure on stream request "
267                "on partition ~p. Retrying...", [Pid, PartId]),
268            timer:sleep(100),
269            enum_docs_since(Pid, PartId, PartVersions, StartSeq, EndSeq,
270                Flags, CallbackFn, InAcc);
271        _ ->
272            Resp
273        end
274    end.
275
276
277% gen_server callbacks
278
279-spec init([binary() | non_neg_integer()]) -> {ok, #state{}} |
280                    {stop, sasl_auth_failed | closed | inet:posix()}.
281init([Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags]) ->
282    DcpTimeout = list_to_integer(
283        couch_config:get("dcp", "connection_timeout")),
284    DcpPort = list_to_integer(couch_config:get("dcp", "port")),
285
286    case gen_tcp:connect("localhost", DcpPort,
287        [binary, {packet, raw}, {active, true}, {nodelay, true},
288                 {buffer, ?DCP_BUF_WINDOW}], DcpTimeout) of
289    {ok, SockPid} ->
290        BufSocket = #bufsocket{sockpid = SockPid, sockbuf = <<>>},
291        State = #state{
292            bufsocket = BufSocket,
293            timeout = DcpTimeout,
294            request_id = 0
295        },
296        % Auth as admin and select bucket for the connection
297        case sasl_auth(AdmUser, AdmPasswd, State) of
298        {ok, State2} ->
299            case select_bucket(Bucket, State2) of
300            {ok, State3} ->
301                % Store the meta information to reconnect
302                Args = [Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags],
303                State4 = State3#state{args = Args},
304                case open_connection(Name, Flags, State4) of
305                {ok, State5} ->
306                    Parent = self(),
307                    process_flag(trap_exit, true),
308                    #state{bufsocket = BufSocket2} = State5,
309                    WorkerPid = spawn_link(
310                        fun() ->
311                            receive_worker(BufSocket2, DcpTimeout, Parent, [])
312                        end),
313                    gen_tcp:controlling_process(?SOCKET(BufSocket), WorkerPid),
314                    case set_buffer_size(State5, BufferSize) of
315                    {ok, State6} ->
316                        {ok, State6#state{worker_pid = WorkerPid}};
317                    {error, Reason} ->
318                        exit(WorkerPid, shutdown),
319                        {stop, Reason}
320                    end;
321                {error, Reason} ->
322                    {stop, Reason}
323                end;
324            {error, Reason} ->
325                {stop, Reason}
326            end;
327        {error, Reason} ->
328            {stop, Reason}
329        end;
330    {error, Reason} ->
331        {stop, {error, {dcp_socket_connect_failed, Reason}}}
332    end.
333
334
335% Add caller to the request queue and wait for gen_server to reply on response arrival
336handle_call({add_stream, PartId, PartUuid, StartSeq, EndSeq, Flags},
337        From, State) ->
338    SnapshotStart = StartSeq,
339    SnapshotEnd = StartSeq,
340    case add_new_stream({PartId, PartUuid, StartSeq, EndSeq,
341        {SnapshotStart, SnapshotEnd}, Flags}, From, State) of
342    {error, Reason} ->
343        {reply, {error, Reason}, State};
344    State2 ->
345        {noreply, State2}
346    end;
347
348handle_call({remove_stream, PartId}, From, State) ->
349    State2 = remove_stream_info(PartId, State),
350    #state{
351       request_id = RequestId,
352       bufsocket = BufSocket
353    } = State2,
354    StreamCloseRequest = couch_dcp_consumer:encode_stream_close(
355        PartId, RequestId),
356    case bufsocket_send(BufSocket, StreamCloseRequest) of
357    ok ->
358        State3 = next_request_id(State2),
359        State4 = add_pending_request(State3, RequestId, {remove_stream, PartId}, From),
360        {noreply, State4};
361    {error, _Reason} = Error ->
362        {reply, Error, State2}
363    end;
364
365handle_call(list_streams, _From, State) ->
366    #state{
367       active_streams = ActiveStreams
368    } = State,
369    Reply = lists:foldl(fun({PartId, _}, Acc) -> [PartId | Acc] end, [], ActiveStreams),
370    {reply, Reply, State};
371
372handle_call({get_failover_log, PartId}, From, State) ->
373    #state{
374       request_id = RequestId,
375       bufsocket = BufSocket
376    } = State,
377    FailoverLogRequest = couch_dcp_consumer:encode_failover_log_request(
378        PartId, RequestId),
379    case bufsocket_send(BufSocket, FailoverLogRequest) of
380    ok ->
381        State2 = next_request_id(State),
382        State3 = add_pending_request(State2, RequestId, get_failover_log, From),
383        {noreply, State3};
384    Error ->
385        {reply, Error, State}
386    end;
387
388% Only used by unit test
389handle_call(get_buffer_size, _From, #state{total_buffer_size = Size} = State) ->
390    {reply, Size, State};
391
392% Only used by unit test
393handle_call(reset_buffer_size, _From, #state{total_buffer_size = Size} = State) ->
394    State2 = State#state{total_buffer_size = 0},
395    {reply, Size, State2};
396
397% Only used by unit test
398handle_call(flush_old_streams_meta, _From, State) ->
399    {reply, ok, State#state{stream_info = dict:new()}};
400
401% Only used by unit test
402handle_call(get_socket, _From, #state{bufsocket = BufSocket} = State) ->
403    {reply, BufSocket, State};
404
405% Only used by unit test
406handle_call({get_buffer_size, RequestId}, _From,
407        #state{stream_queues = StreamQueues} = State) ->
408    {ok, {_, EvQueue}} = dict:find(RequestId, StreamQueues),
409    Size = get_queue_size(EvQueue, 0),
410    {reply, Size, State};
411
412% Only used by unit test
413handle_call({get_event_size, Event}, _From, State) ->
414    Size = get_event_size(Event),
415    {reply, Size, State}.
416
417% If a stream event for this requestId is present in the queue,
418% dequeue it and reply back to the caller.
419% Else, put the caller into the stream queue waiter list
420handle_info({get_stream_event, RequestId, From}, State) ->
421    case stream_event_present(State, RequestId) of
422    true ->
423        Event = peek_stream_event(State, RequestId),
424        {Optype, _, _} = Event,
425        {Msg, State6} = case check_and_send_buffer_ack(State, RequestId, Event, mutation) of
426        {ok, State3} ->
427            {State4, Event} = dequeue_stream_event(State3, RequestId),
428            State5 = case Optype =:= stream_end orelse Optype =:= error of
429            true ->
430                remove_request_queue(State4, RequestId);
431            _ ->
432                State4
433            end,
434            {remove_body_len(Event), State5};
435        {error, Reason} ->
436            {{error, Reason}, State}
437        end,
438        From ! {stream_event, RequestId, Msg},
439        {noreply, State6};
440    false ->
441        case add_stream_event_waiter(State, RequestId, From) of
442        nil ->
443            Reply = {error, event_request_already_exists},
444            From ! {stream_event, RequestId, Reply},
445            {noreply, State};
446        State2 ->
447            {noreply, State2}
448        end;
449    nil ->
450        Reply = {error, vbucket_stream_not_found},
451        From ! {stream_event, RequestId, Reply},
452        {noreply, State}
453    end;
454
455
456handle_info({get_stats, Stat, PartId, From}, State) ->
457    #state{
458       request_id = RequestId,
459       bufsocket = BufSocket
460    } = State,
461    SeqStatRequest = couch_dcp_consumer:encode_stat_request(
462        Stat, PartId, RequestId),
463    case bufsocket_send(BufSocket, SeqStatRequest) of
464    ok ->
465        State2 = next_request_id(State),
466        State3 = add_pending_request(State2, RequestId, get_stats, From),
467        {noreply, State3};
468    Error ->
469        {reply, Error, State}
470    end;
471
472handle_info({get_all_seqs, From}, State) ->
473    #state{
474       request_id = RequestId,
475       bufsocket = BufSocket
476    } = State,
477    SeqStatRequest = couch_dcp_consumer:encode_all_seqs_request(RequestId),
478    case bufsocket_send(BufSocket, SeqStatRequest) of
479    ok ->
480        State2 = next_request_id(State),
481        State3 = add_pending_request(State2, RequestId, all_seqs, From),
482        {noreply, State3};
483    Error ->
484        {reply, Error, State}
485    end;
486
487% Handle response message send by connection receiver worker
488% Reply back to waiting callers
489handle_info({stream_response, RequestId, Msg}, State) ->
490    State3 = case find_pending_request(State, RequestId) of
491    {_, nil} ->
492        State;
493    {ReqInfo, SendTo} ->
494        State2 = case ReqInfo of
495        {add_stream, PartId} ->
496            gen_server:reply(SendTo, Msg),
497            case Msg of
498            {_, {failoverlog, _}} ->
499                add_request_queue(State, PartId, RequestId);
500            _ ->
501                State
502            end;
503        {remove_stream, PartId} ->
504            gen_server:reply(SendTo, Msg),
505            StreamReqId = find_stream_req_id(State, PartId),
506            case check_and_send_buffer_ack(State, StreamReqId, nil, remove_stream) of
507            {ok, NewState} ->
508                case Msg of
509                ok ->
510                    remove_request_queue(NewState, StreamReqId);
511                {error, vbucket_stream_not_found} ->
512                    remove_request_queue(NewState, StreamReqId);
513                _ ->
514                    NewState
515                end;
516            {error, Error} ->
517                throw({control_ack_failed, Error}),
518                State
519            end;
520        % Server sent the response for the internal control request
521        {control_request, Size} ->
522            State#state{max_buffer_size = Size};
523        get_stats ->
524            {MRef, From} = SendTo,
525            From ! {get_stats, MRef, Msg},
526            State;
527        all_seqs ->
528            {MRef, From} = SendTo,
529            From ! {all_seqs, MRef, Msg},
530            State;
531        _ ->
532            gen_server:reply(SendTo, Msg),
533            State
534        end,
535        remove_pending_request(State2, RequestId);
536    nil ->
537        State
538    end,
539    {noreply, State3};
540
541% Respond with the no op message reply to server
542handle_info({stream_noop, RequestId}, State) ->
543    #state {
544        bufsocket = BufSocket
545    } = State,
546    NoOpResponse = couch_dcp_consumer:encode_noop_response(RequestId),
547    % if noop reponse fails two times, server it self will close the connection
548    bufsocket_send(BufSocket, NoOpResponse),
549    {noreply, State};
550
551% Handle events send by connection receiver worker
552% If there is a waiting caller for stream event, reply to them
553% Else, queue the event into the stream queue
554handle_info({stream_event, RequestId, Event}, State) ->
555    {Optype, Data, _Length} = Event,
556    State2 = case Optype of
557    Optype when Optype =:= stream_end orelse Optype =:= error ->
558        #state{
559            stream_info = StreamData
560        } = State,
561        StreamData2 = dict:erase(RequestId, StreamData),
562        State#state{stream_info = StreamData2};
563    snapshot_marker ->
564        store_snapshot_seq(RequestId, Data, State);
565    snapshot_mutation ->
566        store_snapshot_mutation(RequestId, Data, State);
567    snapshot_deletion ->
568        store_snapshot_mutation(RequestId, Data, State)
569    end,
570    case stream_event_waiters_present(State2, RequestId) of
571    true ->
572        {State3, Waiter} = remove_stream_event_waiter(State2, RequestId),
573        {Msg, State6} = case check_and_send_buffer_ack(State3, RequestId, Event, mutation) of
574        {ok, State4} ->
575            State5 = case Optype =:= stream_end orelse Optype =:= error of
576            true ->
577                remove_request_queue(State4, RequestId);
578            _ ->
579                State4
580            end,
581            {remove_body_len(Event), State5};
582        {error, Reason} ->
583            State4 = enqueue_stream_event(State3, RequestId, Event),
584            {{error, Reason}, State4}
585        end,
586        Waiter ! {stream_event, RequestId, Msg},
587        {noreply, State6};
588    false ->
589        State3 = enqueue_stream_event(State2, RequestId, Event),
590        {noreply, State3};
591    nil ->
592        % We might have explicitly closed a stream using close_stream command
593        % Before the server received close_stream message, it would have placed
594        % some mutations in the network buffer queue. We still need to acknowledge
595        % the mutations received.
596        {ok, State3} = check_and_send_buffer_ack(State, RequestId, Event, mutation),
597        {noreply, State3}
598    end;
599
600handle_info({'EXIT', Pid, {conn_error, Reason}}, #state{worker_pid = Pid} = State) ->
601    [Name, Bucket, _AdmUser, _AdmPasswd, _BufferSize, _Flags] = State#state.args,
602    ?LOG_ERROR("dcp client (~s, ~s): dcp receive worker failed due to reason: ~p."
603        " Restarting dcp receive worker...",
604        [Bucket, Name, Reason]),
605    timer:sleep(?DCP_RETRY_TIMEOUT),
606    restart_worker(State);
607
608handle_info({'EXIT', Pid, Reason}, #state{worker_pid = Pid} = State) ->
609    {stop, Reason, State};
610
611handle_info({print_log, ReqId}, State) ->
612    [Name, Bucket, _AdmUser, _AdmPasswd, _BufferSize, _Flags] = State#state.args,
613    case find_stream_info(ReqId, State) of
614    nil ->
615        ?LOG_ERROR(
616            "dcp client (~s, ~s): Obtaining message from server timed out "
617            "after ~p seconds [RequestId ~p]. Waiting...",
618            [Bucket, Name, ?TIMEOUT / 1000, ReqId]);
619    StreamInfo ->
620        #stream_info{
621           start_seq = Start,
622           end_seq = End,
623           part_id = PartId
624        } = StreamInfo,
625        ?LOG_ERROR("dcp client (~s, ~s): Obtaining mutation from server timed out "
626            "after ~p seconds [RequestId ~p, PartId ~p, StartSeq ~p, EndSeq ~p]. Waiting...",
627            [Bucket, Name, ?TIMEOUT / 1000, ReqId, PartId, Start, End])
628    end,
629    {noreply, State};
630
631handle_info(Msg, State) ->
632    {stop, {unexpected_info, Msg}, State}.
633
634-spec handle_cast(any(), #state{}) ->
635                         {stop, {unexpected_cast, any()}, #state{}}.
636handle_cast(Msg, State) ->
637    {stop, {unexpected_cast, Msg}, State}.
638
639
640-spec terminate(any(), #state{}) -> ok.
641terminate(_Reason, #state{worker_pid = Pid}) ->
642    exit(Pid, shutdown),
643    ok.
644
645
646-spec code_change(any(), #state{}, any()) -> {ok, #state{}}.
647code_change(_OldVsn, State, _Extra) ->
648    {ok, State}.
649
650
651format_status(_Opt, [_PDict, #state{stream_queues = StreamQueues} = State]) ->
652    TransformFn = fun(_Key, {Waiter, Queue}) ->
653                     {Waiter, {queue:len(Queue), queue:peek_r(Queue)}}
654                  end,
655    State#state{stream_queues = dict:map(TransformFn, StreamQueues)}.
656
657
658% Internal functions
659
660-spec sasl_auth(binary(), binary(), #state{}) -> {ok, #state{}} |
661                            {error, sasl_auth_failed | closed | inet:posix()}.
662sasl_auth(User, Passwd, State) ->
663    #state{
664        bufsocket = BufSocket,
665        timeout = DcpTimeout,
666        request_id = RequestId
667    } = State,
668    Authenticate = couch_dcp_consumer:encode_sasl_auth(User, Passwd, RequestId),
669    case bufsocket_send(BufSocket, Authenticate) of
670    ok ->
671        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
672        {ok, Header, BufSocket2} ->
673            {sasl_auth, Status, RequestId, BodyLength} =
674                couch_dcp_consumer:parse_header(Header),
675            % Receive the body so that it is not mangled with the next request,
676            % we care about the status only though
677            case bufsocket_recv(BufSocket2, BodyLength, DcpTimeout) of
678            {ok, _, BufSocket3} ->
679                case Status of
680                ?DCP_STATUS_OK ->
681                    {ok, State#state{
682                        request_id = RequestId + 1,
683                        bufsocket = BufSocket3
684                    }};
685                ?DCP_STATUS_SASL_AUTH_FAILED ->
686                    {error, sasl_auth_failed}
687                end;
688            {error, _} = Error ->
689                Error
690            end;
691        {error, _} = Error ->
692            Error
693        end;
694    {error, _} = Error ->
695        Error
696    end.
697
698-spec select_bucket(binary(), #state{}) -> {ok, #state{}} | {error, term()}.
699select_bucket(Bucket, State) ->
700    #state{
701        bufsocket = BufSocket,
702        timeout = DcpTimeout,
703        request_id = RequestId
704    } = State,
705    SelectBucket = couch_dcp_consumer:encode_select_bucket(Bucket, RequestId),
706    case bufsocket_send(BufSocket, SelectBucket) of
707    ok ->
708        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
709        {ok, Header, BufSocket2} ->
710            {select_bucket, Status, RequestId, BodyLength} =
711                                    couch_dcp_consumer:parse_header(Header),
712            case Status of
713            ?DCP_STATUS_OK ->
714                {ok, State#state{
715                    request_id = RequestId + 1,
716                    bufsocket = BufSocket2
717                }};
718            _ ->
719                case parse_error_response(
720                        BufSocket2, DcpTimeout, BodyLength, Status) of
721                % When the authentication happened with bucket name and
722                % password, then the correct bucket is already selected. In
723                % this case a select bucket command returns "not supported".
724                {{error, not_supported}, BufSocket3} ->
725                    {ok, State#state{
726                        request_id = RequestId + 1,
727                        bufsocket = BufSocket3
728                    }};
729                {{error, _}, _} = Error ->
730                    Error
731                end
732            end;
733        {error, _} = Error ->
734            Error
735        end;
736    {error, _} = Error ->
737        Error
738    end.
739
740-spec open_connection(binary(), non_neg_integer(), #state{}) ->
741    {ok, #state{}} | {error, term()}.
742open_connection(Name, Flags, State) ->
743    #state{
744        bufsocket = BufSocket,
745        timeout = DcpTimeout,
746        request_id = RequestId
747    } = State,
748    OpenConnection = couch_dcp_consumer:encode_open_connection(
749        Name, Flags, RequestId),
750    case bufsocket_send(BufSocket, OpenConnection) of
751    ok ->
752        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
753        {ok, Header, BufSocket2} ->
754            {open_connection, RequestId} = couch_dcp_consumer:parse_header(Header),
755            State2 = State#state{bufsocket = BufSocket2},
756            {ok, next_request_id(State2)};
757        {error, _} = Error ->
758            Error
759        end;
760    {error, _} = Error ->
761        Error
762    end.
763
764
765-spec receive_snapshot_marker(#bufsocket{}, timeout(),  size()) ->
766                                     {ok, {update_seq(), update_seq(),
767                                           non_neg_integer()}, #bufsocket{}} |
768                                     {error, closed}.
769receive_snapshot_marker(BufSocket, Timeout, BodyLength) ->
770    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
771    {ok, Body, BufSocket2} ->
772         {snapshot_marker, StartSeq, EndSeq, Type} =
773             couch_dcp_consumer:parse_snapshot_marker(Body),
774         {ok, {StartSeq, EndSeq, Type}, BufSocket2};
775    {error, _} = Error ->
776        Error
777    end.
778
779-spec receive_snapshot_mutation(#bufsocket{}, timeout(), partition_id(), size(),
780                                size(), size(), uint64(), dcp_data_type()) ->
781                                {#dcp_doc{}, #bufsocket{}} | {error, closed}.
782receive_snapshot_mutation(BufSocket, Timeout, PartId, KeyLength, BodyLength,
783        ExtraLength, Cas, DataType) ->
784    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
785    {ok, Body, BufSocket2} ->
786         {snapshot_mutation, Mutation} =
787             couch_dcp_consumer:parse_snapshot_mutation(KeyLength, Body,
788                 BodyLength, ExtraLength),
789         #mutation{
790             seq = Seq,
791             rev_seq = RevSeq,
792             flags = Flags,
793             expiration = Expiration,
794             key = Key,
795             value = Value
796         } = Mutation,
797         {#dcp_doc{
798             id = Key,
799             body = Value,
800             data_type = DataType,
801             partition = PartId,
802             cas = Cas,
803             rev_seq = RevSeq,
804             seq = Seq,
805             flags = Flags,
806             expiration = Expiration,
807             deleted = false
808         }, BufSocket2};
809    {error, _} = Error ->
810        Error
811    end.
812
813-spec receive_snapshot_deletion(#bufsocket{}, timeout(), partition_id(), size(),
814                                size(), uint64(), dcp_data_type()) ->
815                                        {#dcp_doc{}, #bufsocket{}} |
816                                        {error, closed | inet:posix()}.
817receive_snapshot_deletion(BufSocket, Timeout, PartId, KeyLength, BodyLength,
818        Cas, DataType) ->
819    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
820    {ok, Body, BufSocket2} ->
821         {snapshot_deletion, Deletion} =
822             couch_dcp_consumer:parse_snapshot_deletion(KeyLength, Body),
823         {Seq, RevSeq, Key, _Metadata, XATTRs} = Deletion,
824         {#dcp_doc{
825             id = Key,
826             body = XATTRs,
827             data_type = DataType,
828             partition = PartId,
829             cas = Cas,
830             rev_seq = RevSeq,
831             seq = Seq,
832             flags = 0,
833             expiration = 0,
834             deleted = true
835         }, BufSocket2};
836    {error, Reason} ->
837        {error, Reason}
838    end.
839
840-spec receive_stream_end(#bufsocket{}, timeout(), size()) ->
841            {<<_:32>>, #bufsocket{}} | {error, closed | inet:posix()}.
842receive_stream_end(BufSocket, Timeout, BodyLength) ->
843    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
844    {ok, Flag, BufSocket2} ->
845        {Flag, BufSocket2};
846    {error, Reason} ->
847        {error, Reason}
848    end.
849
850
851% Returns the failover log as a list 2-tuple pairs with
852% partition UUID and sequence number
853-spec receive_failover_log(#bufsocket{}, timeout(), char(), size()) ->
854        {{'ok', list(partition_version())}, #bufsocket{}} | {error, closed | inet:posix()}.
855receive_failover_log(_BufSocket, _Timeout, _Status, 0) ->
856    {error, no_failover_log_found};
857receive_failover_log(BufSocket, Timeout, Status, BodyLength) ->
858    case Status of
859    ?DCP_STATUS_OK ->
860        case bufsocket_recv(BufSocket, BodyLength, Timeout) of
861        {ok, Body, BufSocket2} ->
862            {couch_dcp_consumer:parse_failover_log(Body), BufSocket2};
863        {error, _} = Error->
864            Error
865        end;
866    _ ->
867        {error, Status}
868    end.
869
870-spec receive_rollback_seq(#bufsocket{}, timeout(), size()) ->
871        {ok, update_seq(), #bufsocket{}} | {error, closed | inet:posix()}.
872receive_rollback_seq(BufSocket, Timeout, BodyLength) ->
873    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
874    {ok, <<RollbackSeq:?DCP_SIZES_BY_SEQ>>, BufSocket2} ->
875        {ok, RollbackSeq, BufSocket2};
876    {error, _} = Error->
877        Error
878    end.
879
880
881-spec receive_stat(#bufsocket{}, timeout(), dcp_status(), size(), size()) ->
882        {ok, {binary(), binary()} |
883        {ok, {binary(), binary()}, #bufsocket{}} |
884        {error, {dcp_status(), binary()}}} |
885        {error, closed}.
886receive_stat(BufSocket, Timeout, Status, BodyLength, KeyLength) ->
887    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
888    {ok, Body, BufSocket2} ->
889        {couch_dcp_consumer:parse_stat(
890            Body, Status, KeyLength, BodyLength - KeyLength), BufSocket2};
891    {error, Reason} ->
892        {error, Reason}
893    end.
894
895-spec receive_all_seqs(#bufsocket{}, timeout(), dcp_status(), size()) ->
896        {ok, list()} |
897        {error, {dcp_status(), binary()}} |
898        {error, closed}.
899receive_all_seqs(BufSocket, Timeout, Status, BodyLength) ->
900    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
901    {ok, Body, BufSocket2} ->
902        {couch_dcp_consumer:parse_all_seqs(Status, Body, []), BufSocket2};
903    {error, Reason} ->
904        {error, Reason}
905    end.
906
907-spec receive_events(pid(), request_id(), mutations_fold_fun(),
908                     mutations_fold_acc()) -> {ok, mutations_fold_acc()} |
909                                              {error, term()}.
910receive_events(Pid, RequestId, CallbackFn, InAcc) ->
911    {Optype, Data} = get_stream_event(Pid, RequestId),
912    case Optype of
913    stream_end ->
914        {ok, InAcc};
915    snapshot_marker ->
916        InAcc2 = CallbackFn({snapshot_marker, Data}, InAcc),
917        receive_events(Pid, RequestId, CallbackFn, InAcc2);
918    error ->
919        {error, Data};
920    _ ->
921        InAcc2 = CallbackFn(Data, InAcc),
922        receive_events(Pid, RequestId, CallbackFn, InAcc2)
923    end.
924
925-spec socket_send(socket(), iodata()) ->
926        ok | {error, closed | inet:posix()}.
927socket_send(Socket, Packet) ->
928    gen_tcp:send(Socket, Packet).
929
930-spec bufsocket_send(#bufsocket{}, iodata()) ->
931        ok | {error, closed | inet:posix()}.
932bufsocket_send(BufSocket, Packet) ->
933    socket_send(?SOCKET(BufSocket), Packet).
934
935-spec socket_recv(socket(), timeout()) ->
936        {ok, binary()} | {error, closed | inet:posix()}.
937socket_recv(SockPid, Timeout) ->
938    receive
939    {tcp, SockPid, Data} ->
940        {ok, Data};
941    {tcp_closed, SockPid} ->
942        {error, closed};
943    {tcp_error, SockPid, Reason} ->
944        {error, Reason}
945    after Timeout ->
946        {ok, <<>>}
947    end.
948
949-spec bufsocket_recv(#bufsocket{}, size(), timeout()) ->
950        {ok, binary(), #bufsocket{}} | {error, closed | inet:posix()}.
951bufsocket_recv(BufSocket, 0, _Timeout) ->
952    {ok, <<>>, BufSocket};
953bufsocket_recv(BufSocket, Length, Timeout) ->
954    #bufsocket{sockbuf = SockBuf} =  BufSocket,
955    case erlang:byte_size(SockBuf) >= Length of
956    true ->
957        <<Head:Length/binary, Tail/binary>> = SockBuf,
958        BufSocket2 = BufSocket#bufsocket{sockbuf = Tail},
959        {ok, Head, BufSocket2};
960    false ->
961        case socket_recv(?SOCKET(BufSocket), Timeout) of
962        {ok, Data} ->
963            Buf = <<SockBuf/binary, Data/binary>>,
964            BufSocket2 = BufSocket#bufsocket{sockbuf = Buf},
965            bufsocket_recv(BufSocket2, Length, Timeout);
966        {error, Reason} ->
967            {error, Reason}
968        end
969    end.
970
971-spec add_pending_request(#state{}, request_id(), term(), nil | {pid(), term()}) -> #state{}.
972add_pending_request(State, RequestId, ReqInfo, From) ->
973    #state{
974       pending_requests = PendingRequests
975    } = State,
976    PendingRequests2 = dict:store(RequestId, {ReqInfo, From}, PendingRequests),
977    State#state{pending_requests = PendingRequests2}.
978
979remove_pending_request(State, RequestId) ->
980    #state{
981       pending_requests = PendingRequests
982    } = State,
983    PendingRequests2 = dict:erase(RequestId, PendingRequests),
984    State#state{pending_requests = PendingRequests2}.
985
986
987-spec find_pending_request(#state{}, request_id()) -> nil | {term(), nil | {pid(), term()}}.
988find_pending_request(State, RequestId) ->
989    #state{
990       pending_requests = PendingRequests
991    } = State,
992    case dict:find(RequestId, PendingRequests) of
993    error ->
994        nil;
995    {ok, Pending} ->
996        Pending
997    end.
998
999-spec next_request_id(#state{}) -> #state{}.
1000next_request_id(#state{request_id = RequestId} = State) ->
1001    RequestId2 = case RequestId of
1002    Id when Id + 1 < (1 bsl ?DCP_SIZES_OPAQUE) ->
1003        Id + 1;
1004    _ ->
1005        0
1006    end,
1007    State#state{request_id = RequestId2}.
1008
1009-spec remove_request_queue(#state{}, request_id()) -> #state{}.
1010remove_request_queue(State, RequestId) ->
1011    #state{
1012       active_streams = ActiveStreams,
1013       stream_queues = StreamQueues
1014    } = State,
1015    ActiveStreams2 = lists:keydelete(RequestId, 2, ActiveStreams),
1016
1017    % All active streams have finished reading
1018    % Let us ack for remaining unacked bytes
1019    case length(ActiveStreams2) of
1020    0 ->
1021        {ok, State2} = send_buffer_ack(State);
1022    _ ->
1023        State2 = State
1024    end,
1025
1026    StreamQueues2 = dict:erase(RequestId, StreamQueues),
1027    State2#state{
1028       active_streams = ActiveStreams2,
1029       stream_queues = StreamQueues2
1030    }.
1031
1032
1033-spec add_request_queue(#state{}, partition_id(), request_id()) -> #state{}.
1034add_request_queue(State, PartId, RequestId) ->
1035    #state{
1036       active_streams = ActiveStreams,
1037       stream_queues = StreamQueues
1038    } = State,
1039   ActiveStreams2 =  [{PartId, RequestId} | ActiveStreams],
1040   StreamQueues2 = dict:store(RequestId, {nil, queue:new()}, StreamQueues),
1041   State#state{
1042       active_streams = ActiveStreams2,
1043       stream_queues = StreamQueues2
1044    }.
1045
1046
1047-spec enqueue_stream_event(#state{}, request_id(), tuple()) -> #state{}.
1048enqueue_stream_event(State, RequestId, Event) ->
1049    #state{
1050       stream_queues = StreamQueues
1051    } = State,
1052    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1053    State#state{
1054        stream_queues =
1055            dict:store(RequestId,
1056                       {Waiter, queue:in(Event, EvQueue)},
1057                       StreamQueues)
1058    }.
1059
1060-spec dequeue_stream_event(#state{}, request_id()) ->
1061                               {#state{}, tuple()}.
1062dequeue_stream_event(State, RequestId) ->
1063    #state{
1064       stream_queues = StreamQueues
1065    } = State,
1066    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1067    {{value, Event}, Rest} = queue:out(EvQueue),
1068    State2 = State#state{
1069        stream_queues =
1070            dict:store(RequestId, {Waiter, Rest}, StreamQueues)
1071    },
1072    {State2, Event}.
1073
1074-spec peek_stream_event(#state{}, request_id()) -> tuple().
1075peek_stream_event(State, RequestId) ->
1076    #state{
1077       stream_queues = StreamQueues
1078    } = State,
1079    {ok, {_, EvQueue}} = dict:find(RequestId, StreamQueues),
1080    {value, Event} = queue:peek(EvQueue),
1081    Event.
1082
1083-spec add_stream_event_waiter(#state{}, request_id(), term()) -> #state{} | nil.
1084add_stream_event_waiter(State, RequestId, NewWaiter) ->
1085    #state{
1086       stream_queues = StreamQueues
1087    } = State,
1088    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1089    case Waiter of
1090    nil ->
1091        StreamQueues2 = dict:store(RequestId, {NewWaiter, EvQueue}, StreamQueues),
1092        State#state{
1093           stream_queues = StreamQueues2
1094        };
1095    _ ->
1096        nil
1097    end.
1098
1099
1100-spec stream_event_present(#state{}, request_id()) -> nil | true | false.
1101stream_event_present(State, RequestId) ->
1102    #state{
1103       stream_queues = StreamQueues
1104    } = State,
1105    case dict:find(RequestId, StreamQueues) of
1106    error ->
1107        nil;
1108    {ok, {_, EvQueue}} ->
1109        queue:is_empty(EvQueue) =:= false
1110    end.
1111
1112
1113-spec stream_event_waiters_present(#state{}, request_id()) -> nil | true | false.
1114stream_event_waiters_present(State, RequestId) ->
1115    #state{
1116       stream_queues = StreamQueues
1117    } = State,
1118    case dict:find(RequestId, StreamQueues) of
1119    error ->
1120        nil;
1121    {ok, {Waiter, _}} ->
1122        Waiter =/= nil
1123    end.
1124
1125
1126-spec remove_stream_event_waiter(#state{}, request_id()) -> {#state{}, term()}.
1127remove_stream_event_waiter(State, RequestId) ->
1128    #state{
1129       stream_queues = StreamQueues
1130    } = State,
1131    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1132    State2 = State#state{
1133        stream_queues = dict:store(RequestId, {nil, EvQueue}, StreamQueues)
1134    },
1135    {State2, Waiter}.
1136
1137
1138-spec find_stream_req_id(#state{}, partition_id()) -> request_id() | nil.
1139find_stream_req_id(State, PartId) ->
1140    #state{
1141       active_streams = ActiveStreams
1142    } = State,
1143    case lists:keyfind(PartId, 1, ActiveStreams) of
1144    {PartId, StreamReqId} ->
1145        StreamReqId;
1146    false ->
1147        nil
1148    end.
1149
1150-spec parse_error_response(#bufsocket{}, timeout(), integer(), integer()) ->
1151                    {'error', atom() | {'status', integer()}} |
1152                    {'error', atom() | {'status', integer()}, #bufsocket{}}.
1153parse_error_response(BufSocket, Timeout, BodyLength, Status) ->
1154    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
1155    {ok, _, BufSocket2} ->
1156        {status_to_error(Status), BufSocket2};
1157    {error, _} = Error ->
1158        Error
1159    end.
1160
1161-spec status_to_error(integer()) -> {'error', atom() | {'status', integer()}}.
1162status_to_error(?DCP_STATUS_KEY_NOT_FOUND) ->
1163    {error, vbucket_stream_not_found};
1164status_to_error(?DCP_STATUS_ERANGE) ->
1165    {error, wrong_start_sequence_number};
1166status_to_error(?DCP_STATUS_KEY_EEXISTS) ->
1167    {error, vbucket_stream_already_exists};
1168status_to_error(?DCP_STATUS_NOT_MY_VBUCKET) ->
1169    {error, server_not_my_vbucket};
1170status_to_error(?DCP_STATUS_TMP_FAIL) ->
1171    {error, vbucket_stream_tmp_fail};
1172status_to_error(?DCP_STATUS_NOT_SUPPORTED) ->
1173    {error, not_supported};
1174status_to_error(Status) ->
1175    {error, {status, Status}}.
1176
1177
1178% The worker process for handling dcp connection downstream pipe
1179% Read and parse downstream messages and send to the gen_server process
1180-spec receive_worker(#bufsocket{}, timeout(), pid(), list()) ->
1181                                                    closed | inet:posix().
1182receive_worker(BufSocket, Timeout, Parent, MsgAcc0) ->
1183    case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, infinity) of
1184    {ok, Header, BufSocket2} ->
1185        {Action, MsgAcc, BufSocket3} =
1186        case couch_dcp_consumer:parse_header(Header) of
1187        {control_request, Status, RequestId} ->
1188            {done, {stream_response, RequestId, {RequestId, Status}},
1189                BufSocket2};
1190        {noop_request, RequestId} ->
1191            {done, {stream_noop, RequestId}, BufSocket2};
1192        {buffer_ack, ?DCP_STATUS_OK, _RequestId} ->
1193            {true, [], BufSocket2};
1194        {stream_request, Status, RequestId, BodyLength} ->
1195            {Response, BufSocket5} = case Status of
1196            ?DCP_STATUS_OK ->
1197                case receive_failover_log(
1198                    BufSocket2, Timeout, Status, BodyLength) of
1199                {{ok, FailoverLog}, BufSocket4} ->
1200                    {{failoverlog, FailoverLog}, BufSocket4};
1201                Error ->
1202                    {Error, BufSocket2}
1203                end;
1204            ?DCP_STATUS_ROLLBACK ->
1205                case receive_rollback_seq(
1206                    BufSocket2, Timeout, BodyLength) of
1207                {ok, RollbackSeq, BufSocket4} ->
1208                    {{rollback, RollbackSeq}, BufSocket4};
1209                Error ->
1210                    {Error, BufSocket2}
1211                end;
1212            _ ->
1213                parse_error_response(BufSocket2, Timeout, BodyLength, Status)
1214            end,
1215            {done, {stream_response, RequestId, {RequestId, Response}},
1216                BufSocket5};
1217        {failover_log, Status, RequestId, BodyLength} ->
1218            {Response, BufSocket5} = receive_failover_log(
1219                BufSocket2, Timeout, Status, BodyLength),
1220            {done, {stream_response, RequestId, Response}, BufSocket5};
1221        {stream_close, Status, RequestId, BodyLength} ->
1222            {Response, BufSocket5} = case Status of
1223            ?DCP_STATUS_OK ->
1224                {ok, BufSocket2};
1225            _ ->
1226                parse_error_response(BufSocket2, Timeout, BodyLength, Status)
1227            end,
1228            {done, {stream_response, RequestId, Response}, BufSocket5};
1229        {stats, Status, RequestId, BodyLength, KeyLength} ->
1230            case BodyLength of
1231            0 ->
1232                case Status of
1233                ?DCP_STATUS_OK ->
1234                    StatAcc = lists:reverse(MsgAcc0),
1235                    {done, {stream_response, RequestId, {ok, StatAcc}},
1236                        BufSocket2};
1237                % Some errors might not contain a body
1238                _ ->
1239                    Error = {error, {Status, <<>>}},
1240                    {done, {stream_response, RequestId, Error}, BufSocket2}
1241                end;
1242            _ ->
1243                case receive_stat(
1244                    BufSocket2, Timeout, Status, BodyLength, KeyLength) of
1245                {{ok, Stat}, BufSocket5} ->
1246                    {true, [Stat | MsgAcc0], BufSocket5};
1247                {error, _} = Error ->
1248                    {done, {stream_response, RequestId, Error}, BufSocket2}
1249                end
1250            end;
1251        {snapshot_marker, _PartId, RequestId, BodyLength} ->
1252            {ok, SnapshotMarker, BufSocket5} = receive_snapshot_marker(
1253                BufSocket2, Timeout, BodyLength),
1254            {done, {stream_event, RequestId,
1255                {snapshot_marker, SnapshotMarker, BodyLength}}, BufSocket5};
1256        {snapshot_mutation, PartId, RequestId, KeyLength, BodyLength,
1257                ExtraLength, Cas, DataType} ->
1258            {Mutation, BufSocket5} = receive_snapshot_mutation(
1259                BufSocket2, Timeout, PartId, KeyLength, BodyLength, ExtraLength,
1260                Cas, DataType),
1261            {done, {stream_event, RequestId,
1262                    {snapshot_mutation, Mutation, BodyLength}}, BufSocket5};
1263        % For the indexer and XDCR there's no difference between a deletion
1264        % end an expiration. In both cases the items should get removed.
1265        % Hence the same code can be used after the initial header
1266        % parsing (the body is the same).
1267        {OpCode, PartId, RequestId, KeyLength, BodyLength, Cas, DataType} when
1268                OpCode =:= snapshot_deletion orelse
1269                OpCode =:= snapshot_expiration ->
1270            {Deletion, BufSocket5} = receive_snapshot_deletion(
1271                BufSocket2, Timeout, PartId, KeyLength, BodyLength,
1272                Cas, DataType),
1273            {done, {stream_event, RequestId,
1274                {snapshot_deletion, Deletion, BodyLength}}, BufSocket5};
1275        {stream_end, PartId, RequestId, BodyLength} ->
1276            {Flag, BufSocket5} = receive_stream_end(BufSocket2,
1277                Timeout, BodyLength),
1278            {done, {stream_event, RequestId, {stream_end,
1279                {RequestId, PartId, Flag}, BodyLength}}, BufSocket5};
1280        {all_seqs, Status, RequestId, BodyLength} ->
1281            {Resp, BufSocket5} = receive_all_seqs(
1282                BufSocket2, Timeout, Status, BodyLength),
1283            {done, {stream_response, RequestId, Resp}, BufSocket5}
1284        end,
1285        case Action of
1286        done ->
1287            Parent ! MsgAcc,
1288            receive_worker(BufSocket3, Timeout, Parent, []);
1289        true ->
1290            receive_worker(BufSocket3, Timeout, Parent, MsgAcc)
1291        end;
1292    {error, Reason} ->
1293        exit({conn_error, Reason})
1294    end.
1295
1296% Check if we need to send buffer ack to server and send it
1297% if required.
1298-spec check_and_send_buffer_ack(#state{}, request_id(), tuple() | nil, atom()) ->
1299                        {ok, #state{}} | {error, closed | inet:posix()}.
1300check_and_send_buffer_ack(State, _RequestId, {error, _, _}, _Type) ->
1301    {ok, State};
1302
1303check_and_send_buffer_ack(State, RequestId, Event, Type) ->
1304    #state{
1305        bufsocket = BufSocket,
1306        max_buffer_size = MaxBufSize,
1307        stream_queues = StreamQueues,
1308        total_buffer_size = Size
1309    } = State,
1310    Size2 = case Type of
1311    remove_stream ->
1312        case dict:find(RequestId, StreamQueues) of
1313        error ->
1314            Size;
1315        {ok, {_, EvQueue}} ->
1316            get_queue_size(EvQueue, Size)
1317        end;
1318    mutation ->
1319        Size + get_event_size(Event)
1320    end,
1321    MaxAckSize = MaxBufSize * ?DCP_BUFFER_ACK_THRESHOLD,
1322    {Status, Ret} = if
1323    Size2 > MaxAckSize ->
1324        BufferAckRequest = couch_dcp_consumer:encode_buffer_request(0, Size2),
1325        case bufsocket_send(BufSocket, BufferAckRequest) of
1326        ok ->
1327            {ok, 0};
1328        Error ->
1329            {error, Error}
1330        end;
1331    Size2 == 0 ->
1332        {false, stream_not_found};
1333    true ->
1334        {ok, Size2}
1335    end,
1336    case Status of
1337    ok ->
1338        State2 = State#state{
1339            total_buffer_size = Ret
1340        },
1341        {ok, State2};
1342    error ->
1343        {error, Ret};
1344    false ->
1345        {ok, State}
1346    end.
1347
1348-spec send_buffer_ack(#state{}) ->
1349            {ok, #state{}} | {error, closed | inet:posix()}.
1350send_buffer_ack(State) ->
1351    #state{
1352        bufsocket = BufSocket,
1353        total_buffer_size = Size
1354    } = State,
1355    BufferAckRequest = couch_dcp_consumer:encode_buffer_request(0, Size),
1356    case bufsocket_send(BufSocket, BufferAckRequest) of
1357    ok ->
1358        {ok, State#state{total_buffer_size = 0}};
1359    {error, _Reason} = Error ->
1360        Error
1361    end.
1362
1363
1364-spec set_buffer_size(#state{}, non_neg_integer()) -> {ok ,#state{}} |
1365                                            {error, closed | inet:posix()}.
1366set_buffer_size(State, Size) ->
1367    #state{
1368        bufsocket = BufSocket,
1369        request_id = RequestId
1370    } = State,
1371    ControlRequest = couch_dcp_consumer:encode_control_request(RequestId, connection, Size),
1372    case bufsocket_send(BufSocket, ControlRequest) of
1373    ok ->
1374        State2 = next_request_id(State),
1375        State3 = add_pending_request(State2, RequestId, {control_request, Size}, nil),
1376        State4 = State3#state{max_buffer_size = Size},
1377        {ok, State4};
1378    {error, Error} ->
1379        {error, Error}
1380    end.
1381
1382-spec get_queue_size(queue(), non_neg_integer()) -> non_neg_integer().
1383get_queue_size(EvQueue, Size) ->
1384    case queue:out(EvQueue) of
1385    {empty, _} ->
1386        Size;
1387    {{value, Item}, NewQueue} ->
1388        Size2 = Size + get_event_size(Item),
1389        get_queue_size(NewQueue, Size2)
1390    end.
1391
1392-spec get_event_size({atom(), #dcp_doc{}, non_neg_integer()}) -> non_neg_integer().
1393get_event_size({_Type, _Doc, BodyLength}) ->
1394    ?DCP_HEADER_LEN + BodyLength.
1395
1396-spec remove_stream_info(partition_id(), #state{}) -> #state{}.
1397remove_stream_info(PartId, State) ->
1398    #state{
1399        active_streams = ActiveStreams,
1400        stream_info = StreamData
1401    } = State,
1402    case lists:keyfind(PartId, 1, ActiveStreams) of
1403    false ->
1404        State;
1405    {_, RequestId} ->
1406        StreamData2 = dict:erase(RequestId, StreamData),
1407        State#state{stream_info = StreamData2}
1408    end.
1409
1410-spec insert_stream_info(partition_id(), request_id(), uuid(), non_neg_integer(),
1411                        non_neg_integer(), #state{}, non_neg_integer()) -> #state{}.
1412insert_stream_info(PartId, RequestId, PartUuid, StartSeq, EndSeq, State, Flags) ->
1413    #state{stream_info = StreamData} = State,
1414    Data = #stream_info{
1415        part_id = PartId,
1416        part_uuid = PartUuid,
1417        start_seq = StartSeq,
1418        end_seq = EndSeq,
1419        flags = Flags
1420    },
1421    StreamData2 = dict:store(RequestId, Data, StreamData),
1422    State#state{stream_info = StreamData2}.
1423
1424-spec find_stream_info(request_id(), #state{}) -> #stream_info{} | nil.
1425find_stream_info(RequestId, State) ->
1426    #state{
1427       stream_info = StreamData
1428    } = State,
1429    case dict:find(RequestId, StreamData) of
1430    {ok, Info} ->
1431        Info;
1432    error ->
1433        nil
1434    end.
1435
1436-spec add_new_stream({partition_id(), uuid(), update_seq(), update_seq(),
1437        {update_seq(), update_seq()}, 0..255},
1438        {pid(), string()}, #state{}) -> #state{} | {error, closed | inet:posix()}.
1439add_new_stream({PartId, PartUuid, StartSeq, EndSeq,
1440    {SnapSeqStart, SnapSeqEnd}, Flags}, From, State) ->
1441   #state{
1442       bufsocket = BufSocket,
1443       request_id = RequestId
1444    } = State,
1445    StreamRequest = couch_dcp_consumer:encode_stream_request(
1446        PartId, RequestId, Flags, StartSeq, EndSeq, PartUuid, SnapSeqStart, SnapSeqEnd),
1447    case bufsocket_send(BufSocket, StreamRequest) of
1448    ok ->
1449        State2 = insert_stream_info(PartId, RequestId, PartUuid, StartSeq, EndSeq,
1450            State, Flags),
1451        State3 = next_request_id(State2),
1452        add_pending_request(State3, RequestId, {add_stream, PartId}, From);
1453    Error ->
1454        {error, Error}
1455    end.
1456
1457-spec restart_worker(#state{}) -> {noreply, #state{}} | {stop, sasl_auth_failed}.
1458restart_worker(State) ->
1459    #state{
1460        args = Args
1461    } = State,
1462    case init(Args) of
1463    {stop, Reason} ->
1464        {stop, Reason, State};
1465    {ok, State2} ->
1466        #state{
1467            bufsocket = BufSocket,
1468            worker_pid = WorkerPid
1469        } = State2,
1470        % Replace the socket
1471        State3 = State#state{
1472            bufsocket = BufSocket,
1473            pending_requests = dict:new(),
1474            worker_pid = WorkerPid
1475        },
1476        Error = {error, dcp_conn_closed},
1477        dict:map(fun(_RequestId, {ReqInfo, SendTo}) ->
1478            case ReqInfo of
1479            get_stats ->
1480                {MRef, From} = SendTo,
1481                From ! {get_stats, MRef, Error};
1482	    all_seqs ->
1483                {MRef, From} = SendTo,
1484                From ! {all_seqs, MRef, Error};
1485            {control_request, _} ->
1486                ok;
1487            _ ->
1488                gen_server:reply(SendTo, Error)
1489            end
1490        end, State#state.pending_requests),
1491        dict:map(fun(RequestId, _Value) ->
1492            Event = {error, dcp_conn_closed, 0},
1493            self() ! {stream_event, RequestId, Event}
1494        end, State#state.stream_info),
1495        {noreply, State3}
1496    end.
1497
1498-spec store_snapshot_seq(request_id(), {update_seq(), update_seq(),
1499                            non_neg_integer()}, #state{}) -> #state{}.
1500store_snapshot_seq(RequestId, Data, State) ->
1501    {StartSeq, EndSeq, _Type} = Data,
1502    #state{
1503        stream_info = StreamData
1504    } = State,
1505    case dict:find(RequestId, StreamData) of
1506    {ok, Val} ->
1507        Val2 = Val#stream_info{snapshot_seq = {StartSeq, EndSeq}},
1508        StreamData2 = dict:store(RequestId, Val2, StreamData),
1509        State#state{stream_info = StreamData2};
1510    error ->
1511        State
1512    end.
1513
1514-spec store_snapshot_mutation(request_id(), #dcp_doc{}, #state{}) -> #state{}.
1515store_snapshot_mutation(RequestId, Data, State) ->
1516  #dcp_doc{
1517        seq = Seq
1518    } = Data,
1519    #state{
1520        stream_info = StreamData
1521    } = State,
1522    case dict:find(RequestId, StreamData) of
1523    error ->
1524        State;
1525    {ok, Val} ->
1526        Val2 = Val#stream_info{start_seq = Seq},
1527        StreamData2 = dict:store(RequestId, Val2, StreamData),
1528        State#state{stream_info = StreamData2}
1529    end.
1530
1531-spec remove_body_len({atom(), tuple | #dcp_doc{}, non_neg_integer()}) ->
1532                                                {atom(), tuple | #dcp_doc{}}.
1533remove_body_len({Type, Data, _BodyLength}) ->
1534    {Type, Data}.
1535