1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_dcp_client).
14-behaviour(gen_server).
15
16% Public API
17-export([start/6]).
18-export([add_stream/6, get_seqs/2, get_num_items/2,
19    get_failover_log/2]).
20-export([get_stream_event/2, remove_stream/2, list_streams/1]).
21-export([enum_docs_since/8, restart_worker/1]).
22-export([get_seqs_async/1]).
23
24% gen_server callbacks
25-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
26-export([format_status/2]).
27
28-include("couch_db.hrl").
29-include_lib("couch_dcp/include/couch_dcp.hrl").
30-include_lib("couch_dcp/include/couch_dcp_typespecs.hrl").
31-define(MAX_BUF_SIZE, 10485760).
32-define(TIMEOUT, 60000).
33-define(TIMEOUT_STATS, 2000).
34-define(DCP_RETRY_TIMEOUT, 2000).
35-define(DCP_BUF_WINDOW, 65535).
36-define(SOCKET(BufSocket), BufSocket#bufsocket.sockpid).
37
38-type mutations_fold_fun() :: fun().
39-type mutations_fold_acc() :: any().
40
41-record(bufsocket, {
42    sockpid = nil                   :: socket() | nil,
43    sockbuf = <<>>                  :: binary()
44}).
45
46-record(state, {
47    bufsocket = nil                 :: #bufsocket{} | nil,
48    timeout = 5000                  :: timeout(),
49    request_id = 0                  :: request_id(),
50    pending_requests = dict:new()   :: dict(),
51    stream_queues = dict:new()      :: dict(),
52    active_streams = []             :: list(),
53    worker_pid                      :: pid(),
54    max_buffer_size = ?MAX_BUF_SIZE :: integer(),
55    total_buffer_size = 0           :: non_neg_integer(),
56    stream_info = dict:new()        :: dict(),
57    args = []                       :: list()
58}).
59
60-record(stream_info, {
61    part_id = 0                     :: partition_id(),
62    part_uuid = 0                   :: uuid(),
63    start_seq = 0                   :: update_seq(),
64    end_seq = 0                     :: update_seq(),
65    snapshot_seq = {0, 0}           :: {update_seq(), update_seq()},
66    flags = 0                       :: non_neg_integer()
67}).
68
69% This gen server implements a DCP client with vbucket stream multiplexing
70% The client spawns a worker process to handle all response messages and event
71% messages received from the DCP server. For easiness, responses are classifed into
72% two types of messages, stream_response and stream_event. For any DCP request, the
73% corresponding response message is called stream_response. But a vbucket stream request
74% is like a subscribe mechanism. After the stream initated response arrives, it will start
75% sending vbucket events. This type of messages are called as stream_event.
76%
77% The user can request for a stream using add_stream() API and corresponding events can
78% be received by using get_stream_event() API.
79
80
81% Public API
82
83-spec start(binary(), binary(), binary(), binary(), non_neg_integer(), non_neg_integer()) ->
84    {ok, pid()} | ignore | {error, {already_started, pid()} | term()}.
85start(Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags) ->
86    gen_server:start_link(?MODULE, [Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags], []).
87
88-spec add_stream(pid(), partition_id(), uuid(), update_seq(),
89    update_seq(), dcp_data_type()) -> {error, term()} | {request_id(), term()}.
90add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
91    gen_server:call(
92        Pid, {add_stream, PartId, PartUuid, StartSeq, EndSeq, Flags}, ?TIMEOUT).
93
94
95-spec remove_stream(pid(), partition_id()) ->
96                            'ok' | {'error', term()}.
97remove_stream(Pid, PartId) ->
98    gen_server:call(Pid, {remove_stream, PartId}, ?TIMEOUT).
99
100
101-spec list_streams(pid()) -> list().
102list_streams(Pid) ->
103    gen_server:call(Pid, list_streams).
104
105
106-spec get_stats_reply(pid(), reference()) -> term().
107get_stats_reply(Pid, MRef) ->
108    receive
109    {get_stats, MRef, Reply} ->
110        Reply;
111    {'DOWN', MRef, process, Pid, Reason} ->
112        exit({dcp_client_died, Pid, Reason})
113    after ?TIMEOUT_STATS ->
114        ?LOG_ERROR("dcp client (~p): stats timed out after ~p seconds."
115                   " Waiting...",
116            [Pid, ?TIMEOUT_STATS / 1000]),
117        get_stats_reply(Pid, MRef)
118    end.
119
120-spec get_all_seqs_reply(pid(), reference()) -> term().
121get_all_seqs_reply(Pid, MRef) ->
122    receive
123    {all_seqs, MRef, Reply} ->
124        Reply;
125    {'DOWN', MRef, process, Pid, Reason} ->
126        exit({dcp_client_died, Pid, Reason})
127    after ?TIMEOUT_STATS ->
128        ?LOG_ERROR("dcp client (~p): dcp all seqs timed out after ~p seconds."
129                   " Waiting...",
130            [Pid, ?TIMEOUT_STATS / 1000]),
131        get_all_seqs_reply(Pid, MRef)
132    end.
133
134-spec get_stats(pid(), binary(), partition_id() | nil) -> term().
135get_stats(Pid, Name, PartId) ->
136    MRef = erlang:monitor(process, Pid),
137    Pid ! {get_stats, Name, PartId, {MRef, self()}},
138    Reply = get_stats_reply(Pid, MRef),
139    erlang:demonitor(MRef, [flush]),
140    Reply.
141
142
143% The async get_seqs API requests seq numbers asynchronously.
144% The response will be sent to the caller process asynchronously in the
145% following message format:
146% {all_seqs, nil, StatsResponse}.
147-spec get_seqs_async(pid()) -> ok.
148get_seqs_async(Pid) ->
149    Pid ! {get_all_seqs, {nil, self()}},
150    ok.
151
152-spec get_seqs(pid(), ordsets:ordset(partition_id()) | nil) ->
153         {ok, partition_seqs()} | {error, term()}.
154get_seqs(Pid, SortedPartIds) ->
155    case get_all_seqs(Pid) of
156    {ok, Seqs} ->
157        case SortedPartIds of
158        nil ->
159            {ok, Seqs};
160        _ ->
161            Seqs2 = couch_set_view_util:filter_seqs(SortedPartIds, Seqs),
162            {ok, Seqs2}
163        end;
164    {error, _} = Error ->
165        Error
166    end.
167
168-spec get_all_seqs(pid()) -> term().
169get_all_seqs(Pid) ->
170    MRef = erlang:monitor(process, Pid),
171    Pid ! {get_all_seqs, {MRef, self()}},
172    Reply = get_all_seqs_reply(Pid, MRef),
173    erlang:demonitor(MRef, [flush]),
174    Reply.
175
176-spec get_num_items(pid(), partition_id()) ->
177                           {ok, non_neg_integer()} | {error, not_my_vbucket}.
178get_num_items(Pid, PartId) ->
179    Reply = get_stats(Pid, <<"vbucket-details">>, PartId),
180    case Reply of
181    {ok, Stats} ->
182        BinPartId = list_to_binary(integer_to_list(PartId)),
183        {_, NumItems} = lists:keyfind(
184            <<"vb_", BinPartId/binary, ":num_items">>, 1, Stats),
185        {ok, list_to_integer(binary_to_list(NumItems))};
186    {error, {?DCP_STATUS_NOT_MY_VBUCKET, _}} ->
187        {error, not_my_vbucket}
188    end.
189
190
191-spec get_failover_log(pid(), partition_id()) ->
192                              {error, no_failover_log_found | dcp_status()} |
193                              {ok, partition_version()}.
194get_failover_log(Pid, PartId) ->
195    gen_server:call(Pid, {get_failover_log, PartId}).
196
197
198-spec get_stream_event(pid(), request_id()) ->
199                              {atom(), #dcp_doc{}} | {'error', term()}.
200get_stream_event(Pid, ReqId) ->
201    MRef = erlang:monitor(process, Pid),
202    Pid ! {get_stream_event, ReqId, self()},
203    Reply = get_stream_event_get_reply(Pid, ReqId, MRef),
204    erlang:demonitor(MRef, [flush]),
205    Reply.
206
207get_stream_event_get_reply(Pid, ReqId, MRef) ->
208    receive
209    {stream_event, ReqId, Reply} ->
210        Reply;
211    {'DOWN', MRef, process, Pid, Reason} ->
212        exit({dcp_client_died, Pid, Reason})
213    after ?TIMEOUT ->
214        Msg = {print_log, ReqId},
215        Pid ! Msg,
216        get_stream_event_get_reply(Pid, ReqId, MRef)
217    end.
218
219-spec enum_docs_since(pid(), partition_id(), partition_version(), update_seq(),
220                      update_seq(), 0..255, mutations_fold_fun(),
221                      mutations_fold_acc()) ->
222                             {error, vbucket_stream_not_found |
223                              wrong_start_sequence_number |
224                              too_large_failover_log } |
225                             {rollback, update_seq()} |
226                             {ok, mutations_fold_acc(), partition_version()}.
227enum_docs_since(_, _, [], _, _, _, _, _) ->
228    % No matching partition version found. Recreate the index from scratch
229    {rollback, 0};
230enum_docs_since(Pid, PartId, PartVersions, StartSeq, EndSeq0, Flags,
231        CallbackFn, InAcc) ->
232    [PartVersion | PartVersionsRest] = PartVersions,
233    {PartUuid, _} = PartVersion,
234    EndSeq = case EndSeq0 < StartSeq of
235    true ->
236        ?LOG_INFO("dcp client (~p): Expecting a rollback for partition ~p. "
237        "Found start_seqno > end_seqno (~p > ~p).",
238        [Pid, PartId, StartSeq, EndSeq0]),
239        StartSeq;
240    false ->
241        EndSeq0
242    end,
243
244    case add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) of
245    {error, _} = Error ->
246        Error;
247    {RequestId, Resp} ->
248        case Resp of
249        {failoverlog, FailoverLog} ->
250            case length(FailoverLog) > ?DCP_MAX_FAILOVER_LOG_SIZE of
251            true ->
252                {error, too_large_failover_log};
253            false ->
254                InAcc2 = CallbackFn({part_versions, {PartId, FailoverLog}}, InAcc),
255                case receive_events(Pid, RequestId, CallbackFn, InAcc2) of
256                {ok, InAcc3} ->
257                    {ok, InAcc3, FailoverLog};
258                Error ->
259                    Error
260                end
261            end;
262        {error, vbucket_stream_not_found} ->
263            enum_docs_since(Pid, PartId, PartVersionsRest, StartSeq, EndSeq,
264                Flags, CallbackFn, InAcc);
265        {error, vbucket_stream_tmp_fail} ->
266            ?LOG_INFO("dcp client (~p): Temporary failure on stream request "
267                "on partition ~p. Retrying...", [Pid, PartId]),
268            timer:sleep(100),
269            enum_docs_since(Pid, PartId, PartVersions, StartSeq, EndSeq,
270                Flags, CallbackFn, InAcc);
271        _ ->
272            Resp
273        end
274    end.
275
276
277% gen_server callbacks
278
279-spec init([binary() | non_neg_integer()]) -> {ok, #state{}} |
280                    {stop, sasl_auth_failed | closed | inet:posix()}.
281init([Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags]) ->
282    DcpTimeout = list_to_integer(
283        couch_config:get("dcp", "connection_timeout")),
284    DcpPort = list_to_integer(couch_config:get("dcp", "port")),
285
286    case gen_tcp:connect("localhost", DcpPort,
287        [binary, {packet, raw}, {active, true}, {nodelay, true},
288                 {buffer, ?DCP_BUF_WINDOW}], DcpTimeout) of
289    {ok, SockPid} ->
290        BufSocket = #bufsocket{sockpid = SockPid, sockbuf = <<>>},
291        State = #state{
292            bufsocket = BufSocket,
293            timeout = DcpTimeout,
294            request_id = 0
295        },
296        % Auth as admin and select bucket for the connection
297        case sasl_auth(AdmUser, AdmPasswd, State) of
298        {ok, State2} ->
299            case select_bucket(Bucket, State2) of
300            {ok, State3} ->
301                % Store the meta information to reconnect
302                Args = [Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags],
303                State4 = State3#state{args = Args},
304                case open_connection(Name, Flags, State4) of
305                {ok, State5} ->
306                    Parent = self(),
307                    process_flag(trap_exit, true),
308                    #state{bufsocket = BufSocket2} = State5,
309                    WorkerPid = spawn_link(
310                        fun() ->
311                            receive_worker(BufSocket2, DcpTimeout, Parent, [])
312                        end),
313                    gen_tcp:controlling_process(?SOCKET(BufSocket), WorkerPid),
314                    case set_buffer_size(State5, BufferSize) of
315                    {ok, State6} ->
316                        {ok, State6#state{worker_pid = WorkerPid}};
317                    {error, Reason} ->
318                        exit(WorkerPid, shutdown),
319                        {stop, Reason}
320                    end;
321                {error, Reason} ->
322                    {stop, Reason}
323                end;
324            {error, Reason} ->
325                {stop, Reason}
326            end;
327        {error, Reason} ->
328            {stop, Reason}
329        end;
330    {error, Reason} ->
331        {stop, {error, {dcp_socket_connect_failed, Reason}}}
332    end.
333
334
335% Add caller to the request queue and wait for gen_server to reply on response arrival
336handle_call({add_stream, PartId, PartUuid, StartSeq, EndSeq, Flags},
337        From, State) ->
338    SnapshotStart = StartSeq,
339    SnapshotEnd = StartSeq,
340    case add_new_stream({PartId, PartUuid, StartSeq, EndSeq,
341        {SnapshotStart, SnapshotEnd}, Flags}, From, State) of
342    {error, Reason} ->
343        {reply, {error, Reason}, State};
344    State2 ->
345        {noreply, State2}
346    end;
347
348handle_call({remove_stream, PartId}, From, State) ->
349    State2 = remove_stream_info(PartId, State),
350    #state{
351       request_id = RequestId,
352       bufsocket = BufSocket
353    } = State2,
354    StreamCloseRequest = couch_dcp_consumer:encode_stream_close(
355        PartId, RequestId),
356    case bufsocket_send(BufSocket, StreamCloseRequest) of
357    ok ->
358        State3 = next_request_id(State2),
359        State4 = add_pending_request(State3, RequestId, {remove_stream, PartId}, From),
360        {noreply, State4};
361    {error, _Reason} = Error ->
362        {reply, Error, State2}
363    end;
364
365handle_call(list_streams, _From, State) ->
366    #state{
367       active_streams = ActiveStreams
368    } = State,
369    Reply = lists:foldl(fun({PartId, _}, Acc) -> [PartId | Acc] end, [], ActiveStreams),
370    {reply, Reply, State};
371
372handle_call({get_failover_log, PartId}, From, State) ->
373    #state{
374       request_id = RequestId,
375       bufsocket = BufSocket
376    } = State,
377    FailoverLogRequest = couch_dcp_consumer:encode_failover_log_request(
378        PartId, RequestId),
379    case bufsocket_send(BufSocket, FailoverLogRequest) of
380    ok ->
381        State2 = next_request_id(State),
382        State3 = add_pending_request(State2, RequestId, get_failover_log, From),
383        {noreply, State3};
384    Error ->
385        {reply, Error, State}
386    end;
387
388% Only used by unit test
389handle_call(get_buffer_size, _From, #state{total_buffer_size = Size} = State) ->
390    {reply, Size, State};
391
392% Only used by unit test
393handle_call(reset_buffer_size, _From, #state{total_buffer_size = Size} = State) ->
394    State2 = State#state{total_buffer_size = 0},
395    {reply, Size, State2};
396
397% Only used by unit test
398handle_call(flush_old_streams_meta, _From, State) ->
399    {reply, ok, State#state{stream_info = dict:new()}};
400
401% Only used by unit test
402handle_call(get_socket, _From, #state{bufsocket = BufSocket} = State) ->
403    {reply, BufSocket, State};
404
405% Only used by unit test
406handle_call({get_buffer_size, RequestId}, _From,
407        #state{stream_queues = StreamQueues} = State) ->
408    {ok, {_, EvQueue}} = dict:find(RequestId, StreamQueues),
409    Size = get_queue_size(EvQueue, 0),
410    {reply, Size, State};
411
412% Only used by unit test
413handle_call({get_event_size, Event}, _From, State) ->
414    Size = get_event_size(Event),
415    {reply, Size, State}.
416
417% If a stream event for this requestId is present in the queue,
418% dequeue it and reply back to the caller.
419% Else, put the caller into the stream queue waiter list
420handle_info({get_stream_event, RequestId, From}, State) ->
421    case stream_event_present(State, RequestId) of
422    true ->
423        Event = peek_stream_event(State, RequestId),
424        {Optype, _, _} = Event,
425        {Msg, State6} = case check_and_send_buffer_ack(State, RequestId, Event, mutation) of
426        {ok, State3} ->
427            {State4, Event} = dequeue_stream_event(State3, RequestId),
428            State5 = case Optype =:= stream_end orelse Optype =:= error of
429            true ->
430                remove_request_queue(State4, RequestId);
431            _ ->
432                State4
433            end,
434            {remove_body_len(Event), State5};
435        {error, Reason} ->
436            {{error, Reason}, State}
437        end,
438        From ! {stream_event, RequestId, Msg},
439        {noreply, State6};
440    false ->
441        case add_stream_event_waiter(State, RequestId, From) of
442        nil ->
443            Reply = {error, event_request_already_exists},
444            From ! {stream_event, RequestId, Reply},
445            {noreply, State};
446        State2 ->
447            {noreply, State2}
448        end;
449    nil ->
450        Reply = {error, vbucket_stream_not_found},
451        From ! {stream_event, RequestId, Reply},
452        {noreply, State}
453    end;
454
455
456handle_info({get_stats, Stat, PartId, From}, State) ->
457    #state{
458       request_id = RequestId,
459       bufsocket = BufSocket
460    } = State,
461    SeqStatRequest = couch_dcp_consumer:encode_stat_request(
462        Stat, PartId, RequestId),
463    case bufsocket_send(BufSocket, SeqStatRequest) of
464    ok ->
465        State2 = next_request_id(State),
466        State3 = add_pending_request(State2, RequestId, get_stats, From),
467        {noreply, State3};
468    Error ->
469        {reply, Error, State}
470    end;
471
472handle_info({get_all_seqs, From}, State) ->
473    #state{
474       request_id = RequestId,
475       bufsocket = BufSocket
476    } = State,
477    SeqStatRequest = couch_dcp_consumer:encode_all_seqs_request(RequestId),
478    case bufsocket_send(BufSocket, SeqStatRequest) of
479    ok ->
480        State2 = next_request_id(State),
481        State3 = add_pending_request(State2, RequestId, all_seqs, From),
482        {noreply, State3};
483    Error ->
484        {reply, Error, State}
485    end;
486
487% Handle response message send by connection receiver worker
488% Reply back to waiting callers
489handle_info({stream_response, RequestId, Msg}, State) ->
490    State3 = case find_pending_request(State, RequestId) of
491    {ReqInfo, SendTo} ->
492        State2 = case ReqInfo of
493        {add_stream, PartId} ->
494            gen_server:reply(SendTo, Msg),
495            case Msg of
496            {_, {failoverlog, _}} ->
497                add_request_queue(State, PartId, RequestId);
498            _ ->
499                State
500            end;
501        {remove_stream, PartId} ->
502            gen_server:reply(SendTo, Msg),
503            StreamReqId = find_stream_req_id(State, PartId),
504            case check_and_send_buffer_ack(State, StreamReqId, nil, remove_stream) of
505            {ok, NewState} ->
506                case Msg of
507                ok ->
508                    remove_request_queue(NewState, StreamReqId);
509                {error, vbucket_stream_not_found} ->
510                    remove_request_queue(NewState, StreamReqId);
511                _ ->
512                    NewState
513                end;
514            {error, Error} ->
515                throw({control_ack_failed, Error}),
516                State
517            end;
518        % Server sent the response for the internal control request
519        {control_request, Size} ->
520            State#state{max_buffer_size = Size};
521        get_stats ->
522            {MRef, From} = SendTo,
523            From ! {get_stats, MRef, Msg},
524            State;
525        all_seqs ->
526            {MRef, From} = SendTo,
527            From ! {all_seqs, MRef, Msg},
528            State;
529        _ ->
530            gen_server:reply(SendTo, Msg),
531            State
532        end,
533        remove_pending_request(State2, RequestId);
534    nil ->
535        State
536    end,
537    {noreply, State3};
538
539% Respond with the no op message reply to server
540handle_info({stream_noop, RequestId}, State) ->
541    #state {
542        bufsocket = BufSocket
543    } = State,
544    NoOpResponse = couch_dcp_consumer:encode_noop_response(RequestId),
545    % if noop reponse fails two times, server it self will close the connection
546    bufsocket_send(BufSocket, NoOpResponse),
547    {noreply, State};
548
549% Handle events send by connection receiver worker
550% If there is a waiting caller for stream event, reply to them
551% Else, queue the event into the stream queue
552handle_info({stream_event, RequestId, Event}, State) ->
553    {Optype, Data, _Length} = Event,
554    State2 = case Optype of
555    Optype when Optype =:= stream_end orelse Optype =:= error ->
556        #state{
557            stream_info = StreamData
558        } = State,
559        StreamData2 = dict:erase(RequestId, StreamData),
560        State#state{stream_info = StreamData2};
561    snapshot_marker ->
562        store_snapshot_seq(RequestId, Data, State);
563    snapshot_mutation ->
564        store_snapshot_mutation(RequestId, Data, State);
565    snapshot_deletion ->
566        store_snapshot_mutation(RequestId, Data, State)
567    end,
568    case stream_event_waiters_present(State2, RequestId) of
569    true ->
570        {State3, Waiter} = remove_stream_event_waiter(State2, RequestId),
571        {Msg, State6} = case check_and_send_buffer_ack(State3, RequestId, Event, mutation) of
572        {ok, State4} ->
573            State5 = case Optype =:= stream_end orelse Optype =:= error of
574            true ->
575                remove_request_queue(State4, RequestId);
576            _ ->
577                State4
578            end,
579            {remove_body_len(Event), State5};
580        {error, Reason} ->
581            State4 = enqueue_stream_event(State3, RequestId, Event),
582            {{error, Reason}, State4}
583        end,
584        Waiter ! {stream_event, RequestId, Msg},
585        {noreply, State6};
586    false ->
587        State3 = enqueue_stream_event(State2, RequestId, Event),
588        {noreply, State3};
589    nil ->
590        % We might have explicitly closed a stream using close_stream command
591        % Before the server received close_stream message, it would have placed
592        % some mutations in the network buffer queue. We still need to acknowledge
593        % the mutations received.
594        {ok, State3} = check_and_send_buffer_ack(State, RequestId, Event, mutation),
595        {noreply, State3}
596    end;
597
598handle_info({'EXIT', Pid, {conn_error, Reason}}, #state{worker_pid = Pid} = State) ->
599    [Name, Bucket, _AdmUser, _AdmPasswd, _BufferSize, _Flags] = State#state.args,
600    ?LOG_ERROR("dcp client (~s, ~s): dcp receive worker failed due to reason: ~p."
601        " Restarting dcp receive worker...",
602        [Bucket, Name, Reason]),
603    timer:sleep(?DCP_RETRY_TIMEOUT),
604    restart_worker(State);
605
606handle_info({'EXIT', Pid, Reason}, #state{worker_pid = Pid} = State) ->
607    {stop, Reason, State};
608
609handle_info({print_log, ReqId}, State) ->
610    [Name, Bucket, _AdmUser, _AdmPasswd, _BufferSize, _Flags] = State#state.args,
611    case find_stream_info(ReqId, State) of
612    nil ->
613        ?LOG_ERROR(
614            "dcp client (~s, ~s): Obtaining message from server timed out "
615            "after ~p seconds [RequestId ~p]. Waiting...",
616            [Bucket, Name, ?TIMEOUT / 1000, ReqId]);
617    StreamInfo ->
618        #stream_info{
619           start_seq = Start,
620           end_seq = End,
621           part_id = PartId
622        } = StreamInfo,
623        ?LOG_ERROR("dcp client (~s, ~s): Obtaining mutation from server timed out "
624            "after ~p seconds [RequestId ~p, PartId ~p, StartSeq ~p, EndSeq ~p]. Waiting...",
625            [Bucket, Name, ?TIMEOUT / 1000, ReqId, PartId, Start, End])
626    end,
627    {noreply, State};
628
629handle_info(Msg, State) ->
630    {stop, {unexpected_info, Msg}, State}.
631
632-spec handle_cast(any(), #state{}) ->
633                         {stop, {unexpected_cast, any()}, #state{}}.
634handle_cast(Msg, State) ->
635    {stop, {unexpected_cast, Msg}, State}.
636
637
638-spec terminate(any(), #state{}) -> ok.
639terminate(_Reason, #state{worker_pid = Pid}) ->
640    exit(Pid, shutdown),
641    ok.
642
643
644-spec code_change(any(), #state{}, any()) -> {ok, #state{}}.
645code_change(_OldVsn, State, _Extra) ->
646    {ok, State}.
647
648
649format_status(_Opt, [_PDict, #state{stream_queues = StreamQueues} = State]) ->
650    TransformFn = fun(_Key, {Waiter, Queue}) ->
651                     {Waiter, {queue:len(Queue), queue:peek_r(Queue)}}
652                  end,
653    State#state{stream_queues = dict:map(TransformFn, StreamQueues)}.
654
655
656% Internal functions
657
658-spec sasl_auth(binary(), binary(), #state{}) -> {ok, #state{}} |
659                            {error, sasl_auth_failed | closed | inet:posix()}.
660sasl_auth(User, Passwd, State) ->
661    #state{
662        bufsocket = BufSocket,
663        timeout = DcpTimeout,
664        request_id = RequestId
665    } = State,
666    Authenticate = couch_dcp_consumer:encode_sasl_auth(User, Passwd, RequestId),
667    case bufsocket_send(BufSocket, Authenticate) of
668    ok ->
669        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
670        {ok, Header, BufSocket2} ->
671            {sasl_auth, Status, RequestId, BodyLength} =
672                couch_dcp_consumer:parse_header(Header),
673            % Receive the body so that it is not mangled with the next request,
674            % we care about the status only though
675            case bufsocket_recv(BufSocket2, BodyLength, DcpTimeout) of
676            {ok, _, BufSocket3} ->
677                case Status of
678                ?DCP_STATUS_OK ->
679                    {ok, State#state{
680                        request_id = RequestId + 1,
681                        bufsocket = BufSocket3
682                    }};
683                ?DCP_STATUS_SASL_AUTH_FAILED ->
684                    {error, sasl_auth_failed}
685                end;
686            {error, _} = Error ->
687                Error
688            end;
689        {error, _} = Error ->
690            Error
691        end;
692    {error, _} = Error ->
693        Error
694    end.
695
696-spec select_bucket(binary(), #state{}) -> {ok, #state{}} | {error, term()}.
697select_bucket(Bucket, State) ->
698    #state{
699        bufsocket = BufSocket,
700        timeout = DcpTimeout,
701        request_id = RequestId
702    } = State,
703    SelectBucket = couch_dcp_consumer:encode_select_bucket(Bucket, RequestId),
704    case bufsocket_send(BufSocket, SelectBucket) of
705    ok ->
706        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
707        {ok, Header, BufSocket2} ->
708            {select_bucket, Status, RequestId, BodyLength} =
709                                    couch_dcp_consumer:parse_header(Header),
710            case Status of
711            ?DCP_STATUS_OK ->
712                {ok, State#state{
713                    request_id = RequestId + 1,
714                    bufsocket = BufSocket2
715                }};
716            _ ->
717                case parse_error_response(
718                        BufSocket2, DcpTimeout, BodyLength, Status) of
719                % When the authentication happened with bucket name and
720                % password, then the correct bucket is already selected. In
721                % this case a select bucket command returns "not supported".
722                {{error, not_supported}, BufSocket3} ->
723                    {ok, State#state{
724                        request_id = RequestId + 1,
725                        bufsocket = BufSocket3
726                    }};
727                {{error, _}, _} = Error ->
728                    Error
729                end
730            end;
731        {error, _} = Error ->
732            Error
733        end;
734    {error, _} = Error ->
735        Error
736    end.
737
738-spec open_connection(binary(), non_neg_integer(), #state{}) ->
739    {ok, #state{}} | {error, term()}.
740open_connection(Name, Flags, State) ->
741    #state{
742        bufsocket = BufSocket,
743        timeout = DcpTimeout,
744        request_id = RequestId
745    } = State,
746    OpenConnection = couch_dcp_consumer:encode_open_connection(
747        Name, Flags, RequestId),
748    case bufsocket_send(BufSocket, OpenConnection) of
749    ok ->
750        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
751        {ok, Header, BufSocket2} ->
752            {open_connection, RequestId} = couch_dcp_consumer:parse_header(Header),
753            State2 = State#state{bufsocket = BufSocket2},
754            {ok, next_request_id(State2)};
755        {error, _} = Error ->
756            Error
757        end;
758    {error, _} = Error ->
759        Error
760    end.
761
762
763-spec receive_snapshot_marker(#bufsocket{}, timeout(),  size()) ->
764                                     {ok, {update_seq(), update_seq(),
765                                           non_neg_integer()}, #bufsocket{}} |
766                                     {error, closed}.
767receive_snapshot_marker(BufSocket, Timeout, BodyLength) ->
768    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
769    {ok, Body, BufSocket2} ->
770         {snapshot_marker, StartSeq, EndSeq, Type} =
771             couch_dcp_consumer:parse_snapshot_marker(Body),
772         {ok, {StartSeq, EndSeq, Type}, BufSocket2};
773    {error, _} = Error ->
774        Error
775    end.
776
777-spec receive_snapshot_mutation(#bufsocket{}, timeout(), partition_id(), size(),
778                                size(), size(), uint64(), dcp_data_type()) ->
779                                {#dcp_doc{}, #bufsocket{}} | {error, closed}.
780receive_snapshot_mutation(BufSocket, Timeout, PartId, KeyLength, BodyLength,
781        ExtraLength, Cas, DataType) ->
782    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
783    {ok, Body, BufSocket2} ->
784         {snapshot_mutation, Mutation} =
785             couch_dcp_consumer:parse_snapshot_mutation(KeyLength, Body,
786                 BodyLength, ExtraLength),
787         #mutation{
788             seq = Seq,
789             rev_seq = RevSeq,
790             flags = Flags,
791             expiration = Expiration,
792             key = Key,
793             value = Value
794         } = Mutation,
795         {#dcp_doc{
796             id = Key,
797             body = Value,
798             data_type = DataType,
799             partition = PartId,
800             cas = Cas,
801             rev_seq = RevSeq,
802             seq = Seq,
803             flags = Flags,
804             expiration = Expiration,
805             deleted = false
806         }, BufSocket2};
807    {error, _} = Error ->
808        Error
809    end.
810
811-spec receive_snapshot_deletion(#bufsocket{}, timeout(), partition_id(), size(),
812                                size(), uint64(), dcp_data_type()) ->
813                                        {#dcp_doc{}, #bufsocket{}} |
814                                        {error, closed | inet:posix()}.
815receive_snapshot_deletion(BufSocket, Timeout, PartId, KeyLength, BodyLength,
816        Cas, DataType) ->
817    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
818    {ok, Body, BufSocket2} ->
819         {snapshot_deletion, Deletion} =
820             couch_dcp_consumer:parse_snapshot_deletion(KeyLength, Body),
821         {Seq, RevSeq, Key, _Metadata, XATTRs} = Deletion,
822         {#dcp_doc{
823             id = Key,
824             body = XATTRs,
825             data_type = DataType,
826             partition = PartId,
827             cas = Cas,
828             rev_seq = RevSeq,
829             seq = Seq,
830             flags = 0,
831             expiration = 0,
832             deleted = true
833         }, BufSocket2};
834    {error, Reason} ->
835        {error, Reason}
836    end.
837
838-spec receive_stream_end(#bufsocket{}, timeout(), size()) ->
839            {<<_:32>>, #bufsocket{}} | {error, closed | inet:posix()}.
840receive_stream_end(BufSocket, Timeout, BodyLength) ->
841    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
842    {ok, Flag, BufSocket2} ->
843        {Flag, BufSocket2};
844    {error, Reason} ->
845        {error, Reason}
846    end.
847
848
849% Returns the failover log as a list 2-tuple pairs with
850% partition UUID and sequence number
851-spec receive_failover_log(#bufsocket{}, timeout(), char(), size()) ->
852        {{'ok', list(partition_version())}, #bufsocket{}} | {error, closed | inet:posix()}.
853receive_failover_log(_BufSocket, _Timeout, _Status, 0) ->
854    {error, no_failover_log_found};
855receive_failover_log(BufSocket, Timeout, Status, BodyLength) ->
856    case Status of
857    ?DCP_STATUS_OK ->
858        case bufsocket_recv(BufSocket, BodyLength, Timeout) of
859        {ok, Body, BufSocket2} ->
860            {couch_dcp_consumer:parse_failover_log(Body), BufSocket2};
861        {error, _} = Error->
862            Error
863        end;
864    _ ->
865        {error, Status}
866    end.
867
868-spec receive_rollback_seq(#bufsocket{}, timeout(), size()) ->
869        {ok, update_seq(), #bufsocket{}} | {error, closed | inet:posix()}.
870receive_rollback_seq(BufSocket, Timeout, BodyLength) ->
871    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
872    {ok, <<RollbackSeq:?DCP_SIZES_BY_SEQ>>, BufSocket2} ->
873        {ok, RollbackSeq, BufSocket2};
874    {error, _} = Error->
875        Error
876    end.
877
878
879-spec receive_stat(#bufsocket{}, timeout(), dcp_status(), size(), size()) ->
880        {ok, {binary(), binary()} |
881        {ok, {binary(), binary()}, #bufsocket{}} |
882        {error, {dcp_status(), binary()}}} |
883        {error, closed}.
884receive_stat(BufSocket, Timeout, Status, BodyLength, KeyLength) ->
885    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
886    {ok, Body, BufSocket2} ->
887        {couch_dcp_consumer:parse_stat(
888            Body, Status, KeyLength, BodyLength - KeyLength), BufSocket2};
889    {error, Reason} ->
890        {error, Reason}
891    end.
892
893-spec receive_all_seqs(#bufsocket{}, timeout(), dcp_status(), size()) ->
894        {ok, list()} |
895        {error, {dcp_status(), binary()}} |
896        {error, closed}.
897receive_all_seqs(BufSocket, Timeout, Status, BodyLength) ->
898    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
899    {ok, Body, BufSocket2} ->
900        {couch_dcp_consumer:parse_all_seqs(Status, Body, []), BufSocket2};
901    {error, Reason} ->
902        {error, Reason}
903    end.
904
905-spec receive_events(pid(), request_id(), mutations_fold_fun(),
906                     mutations_fold_acc()) -> {ok, mutations_fold_acc()} |
907                                              {error, term()}.
908receive_events(Pid, RequestId, CallbackFn, InAcc) ->
909    {Optype, Data} = get_stream_event(Pid, RequestId),
910    case Optype of
911    stream_end ->
912        {ok, InAcc};
913    snapshot_marker ->
914        InAcc2 = CallbackFn({snapshot_marker, Data}, InAcc),
915        receive_events(Pid, RequestId, CallbackFn, InAcc2);
916    error ->
917        {error, Data};
918    _ ->
919        InAcc2 = CallbackFn(Data, InAcc),
920        receive_events(Pid, RequestId, CallbackFn, InAcc2)
921    end.
922
923-spec socket_send(socket(), iodata()) ->
924        ok | {error, closed | inet:posix()}.
925socket_send(Socket, Packet) ->
926    gen_tcp:send(Socket, Packet).
927
928-spec bufsocket_send(#bufsocket{}, iodata()) ->
929        ok | {error, closed | inet:posix()}.
930bufsocket_send(BufSocket, Packet) ->
931    socket_send(?SOCKET(BufSocket), Packet).
932
933-spec socket_recv(socket(), timeout()) ->
934        {ok, binary()} | {error, closed | inet:posix()}.
935socket_recv(SockPid, Timeout) ->
936    receive
937    {tcp, SockPid, Data} ->
938        {ok, Data};
939    {tcp_closed, SockPid} ->
940        {error, closed};
941    {tcp_error, SockPid, Reason} ->
942        {error, Reason}
943    after Timeout ->
944        {ok, <<>>}
945    end.
946
947-spec bufsocket_recv(#bufsocket{}, size(), timeout()) ->
948        {ok, binary(), #bufsocket{}} | {error, closed | inet:posix()}.
949bufsocket_recv(BufSocket, 0, _Timeout) ->
950    {ok, <<>>, BufSocket};
951bufsocket_recv(BufSocket, Length, Timeout) ->
952    #bufsocket{sockbuf = SockBuf} =  BufSocket,
953    case erlang:byte_size(SockBuf) >= Length of
954    true ->
955        <<Head:Length/binary, Tail/binary>> = SockBuf,
956        BufSocket2 = BufSocket#bufsocket{sockbuf = Tail},
957        {ok, Head, BufSocket2};
958    false ->
959        case socket_recv(?SOCKET(BufSocket), Timeout) of
960        {ok, Data} ->
961            Buf = <<SockBuf/binary, Data/binary>>,
962            BufSocket2 = BufSocket#bufsocket{sockbuf = Buf},
963            bufsocket_recv(BufSocket2, Length, Timeout);
964        {error, Reason} ->
965            {error, Reason}
966        end
967    end.
968
969-spec add_pending_request(#state{}, request_id(), term(), nil | {pid(), term()}) -> #state{}.
970add_pending_request(State, RequestId, ReqInfo, From) ->
971    #state{
972       pending_requests = PendingRequests
973    } = State,
974    PendingRequests2 = dict:store(RequestId, {ReqInfo, From}, PendingRequests),
975    State#state{pending_requests = PendingRequests2}.
976
977remove_pending_request(State, RequestId) ->
978    #state{
979       pending_requests = PendingRequests
980    } = State,
981    PendingRequests2 = dict:erase(RequestId, PendingRequests),
982    State#state{pending_requests = PendingRequests2}.
983
984
985-spec find_pending_request(#state{}, request_id()) -> nil | {term(), nil | {pid(), term()}}.
986find_pending_request(State, RequestId) ->
987    #state{
988       pending_requests = PendingRequests
989    } = State,
990    case dict:find(RequestId, PendingRequests) of
991    error ->
992        nil;
993    {ok, Pending} ->
994        Pending
995    end.
996
997-spec next_request_id(#state{}) -> #state{}.
998next_request_id(#state{request_id = RequestId} = State) ->
999    RequestId2 = case RequestId of
1000    Id when Id + 1 < (1 bsl ?DCP_SIZES_OPAQUE) ->
1001        Id + 1;
1002    _ ->
1003        0
1004    end,
1005    State#state{request_id = RequestId2}.
1006
1007-spec remove_request_queue(#state{}, request_id()) -> #state{}.
1008remove_request_queue(State, RequestId) ->
1009    #state{
1010       active_streams = ActiveStreams,
1011       stream_queues = StreamQueues
1012    } = State,
1013    ActiveStreams2 = lists:keydelete(RequestId, 2, ActiveStreams),
1014
1015    % All active streams have finished reading
1016    % Let us ack for remaining unacked bytes
1017    case length(ActiveStreams2) of
1018    0 ->
1019        {ok, State2} = send_buffer_ack(State);
1020    _ ->
1021        State2 = State
1022    end,
1023
1024    StreamQueues2 = dict:erase(RequestId, StreamQueues),
1025    State2#state{
1026       active_streams = ActiveStreams2,
1027       stream_queues = StreamQueues2
1028    }.
1029
1030
1031-spec add_request_queue(#state{}, partition_id(), request_id()) -> #state{}.
1032add_request_queue(State, PartId, RequestId) ->
1033    #state{
1034       active_streams = ActiveStreams,
1035       stream_queues = StreamQueues
1036    } = State,
1037   ActiveStreams2 =  [{PartId, RequestId} | ActiveStreams],
1038   StreamQueues2 = dict:store(RequestId, {nil, queue:new()}, StreamQueues),
1039   State#state{
1040       active_streams = ActiveStreams2,
1041       stream_queues = StreamQueues2
1042    }.
1043
1044
1045-spec enqueue_stream_event(#state{}, request_id(), tuple()) -> #state{}.
1046enqueue_stream_event(State, RequestId, Event) ->
1047    #state{
1048       stream_queues = StreamQueues
1049    } = State,
1050    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1051    State#state{
1052        stream_queues =
1053            dict:store(RequestId,
1054                       {Waiter, queue:in(Event, EvQueue)},
1055                       StreamQueues)
1056    }.
1057
1058-spec dequeue_stream_event(#state{}, request_id()) ->
1059                               {#state{}, tuple()}.
1060dequeue_stream_event(State, RequestId) ->
1061    #state{
1062       stream_queues = StreamQueues
1063    } = State,
1064    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1065    {{value, Event}, Rest} = queue:out(EvQueue),
1066    State2 = State#state{
1067        stream_queues =
1068            dict:store(RequestId, {Waiter, Rest}, StreamQueues)
1069    },
1070    {State2, Event}.
1071
1072-spec peek_stream_event(#state{}, request_id()) -> tuple().
1073peek_stream_event(State, RequestId) ->
1074    #state{
1075       stream_queues = StreamQueues
1076    } = State,
1077    {ok, {_, EvQueue}} = dict:find(RequestId, StreamQueues),
1078    {value, Event} = queue:peek(EvQueue),
1079    Event.
1080
1081-spec add_stream_event_waiter(#state{}, request_id(), term()) -> #state{} | nil.
1082add_stream_event_waiter(State, RequestId, NewWaiter) ->
1083    #state{
1084       stream_queues = StreamQueues
1085    } = State,
1086    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1087    case Waiter of
1088    nil ->
1089        StreamQueues2 = dict:store(RequestId, {NewWaiter, EvQueue}, StreamQueues),
1090        State#state{
1091           stream_queues = StreamQueues2
1092        };
1093    _ ->
1094        nil
1095    end.
1096
1097
1098-spec stream_event_present(#state{}, request_id()) -> nil | true | false.
1099stream_event_present(State, RequestId) ->
1100    #state{
1101       stream_queues = StreamQueues
1102    } = State,
1103    case dict:find(RequestId, StreamQueues) of
1104    error ->
1105        nil;
1106    {ok, {_, EvQueue}} ->
1107        queue:is_empty(EvQueue) =:= false
1108    end.
1109
1110
1111-spec stream_event_waiters_present(#state{}, request_id()) -> nil | true | false.
1112stream_event_waiters_present(State, RequestId) ->
1113    #state{
1114       stream_queues = StreamQueues
1115    } = State,
1116    case dict:find(RequestId, StreamQueues) of
1117    error ->
1118        nil;
1119    {ok, {Waiter, _}} ->
1120        Waiter =/= nil
1121    end.
1122
1123
1124-spec remove_stream_event_waiter(#state{}, request_id()) -> {#state{}, term()}.
1125remove_stream_event_waiter(State, RequestId) ->
1126    #state{
1127       stream_queues = StreamQueues
1128    } = State,
1129    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1130    State2 = State#state{
1131        stream_queues = dict:store(RequestId, {nil, EvQueue}, StreamQueues)
1132    },
1133    {State2, Waiter}.
1134
1135
1136-spec find_stream_req_id(#state{}, partition_id()) -> request_id() | nil.
1137find_stream_req_id(State, PartId) ->
1138    #state{
1139       active_streams = ActiveStreams
1140    } = State,
1141    case lists:keyfind(PartId, 1, ActiveStreams) of
1142    {PartId, StreamReqId} ->
1143        StreamReqId;
1144    false ->
1145        nil
1146    end.
1147
1148-spec parse_error_response(#bufsocket{}, timeout(), integer(), integer()) ->
1149                    {'error', atom() | {'status', integer()}} |
1150                    {'error', atom() | {'status', integer()}, #bufsocket{}}.
1151parse_error_response(BufSocket, Timeout, BodyLength, Status) ->
1152    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
1153    {ok, _, BufSocket2} ->
1154        {status_to_error(Status), BufSocket2};
1155    {error, _} = Error ->
1156        Error
1157    end.
1158
1159-spec status_to_error(integer()) -> {'error', atom() | {'status', integer()}}.
1160status_to_error(?DCP_STATUS_KEY_NOT_FOUND) ->
1161    {error, vbucket_stream_not_found};
1162status_to_error(?DCP_STATUS_ERANGE) ->
1163    {error, wrong_start_sequence_number};
1164status_to_error(?DCP_STATUS_KEY_EEXISTS) ->
1165    {error, vbucket_stream_already_exists};
1166status_to_error(?DCP_STATUS_NOT_MY_VBUCKET) ->
1167    {error, server_not_my_vbucket};
1168status_to_error(?DCP_STATUS_TMP_FAIL) ->
1169    {error, vbucket_stream_tmp_fail};
1170status_to_error(?DCP_STATUS_NOT_SUPPORTED) ->
1171    {error, not_supported};
1172status_to_error(Status) ->
1173    {error, {status, Status}}.
1174
1175
1176% The worker process for handling dcp connection downstream pipe
1177% Read and parse downstream messages and send to the gen_server process
1178-spec receive_worker(#bufsocket{}, timeout(), pid(), list()) ->
1179                                                    closed | inet:posix().
1180receive_worker(BufSocket, Timeout, Parent, MsgAcc0) ->
1181    case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, infinity) of
1182    {ok, Header, BufSocket2} ->
1183        {Action, MsgAcc, BufSocket3} =
1184        case couch_dcp_consumer:parse_header(Header) of
1185        {control_request, Status, RequestId} ->
1186            {done, {stream_response, RequestId, {RequestId, Status}},
1187                BufSocket2};
1188        {noop_request, RequestId} ->
1189            {done, {stream_noop, RequestId}, BufSocket2};
1190        {buffer_ack, ?DCP_STATUS_OK, _RequestId} ->
1191            {true, [], BufSocket2};
1192        {stream_request, Status, RequestId, BodyLength} ->
1193            {Response, BufSocket5} = case Status of
1194            ?DCP_STATUS_OK ->
1195                case receive_failover_log(
1196                    BufSocket2, Timeout, Status, BodyLength) of
1197                {{ok, FailoverLog}, BufSocket4} ->
1198                    {{failoverlog, FailoverLog}, BufSocket4};
1199                Error ->
1200                    {Error, BufSocket2}
1201                end;
1202            ?DCP_STATUS_ROLLBACK ->
1203                case receive_rollback_seq(
1204                    BufSocket2, Timeout, BodyLength) of
1205                {ok, RollbackSeq, BufSocket4} ->
1206                    {{rollback, RollbackSeq}, BufSocket4};
1207                Error ->
1208                    {Error, BufSocket2}
1209                end;
1210            _ ->
1211                parse_error_response(BufSocket2, Timeout, BodyLength, Status)
1212            end,
1213            {done, {stream_response, RequestId, {RequestId, Response}},
1214                BufSocket5};
1215        {failover_log, Status, RequestId, BodyLength} ->
1216            {Response, BufSocket5} = receive_failover_log(
1217                BufSocket2, Timeout, Status, BodyLength),
1218            {done, {stream_response, RequestId, Response}, BufSocket5};
1219        {stream_close, Status, RequestId, BodyLength} ->
1220            {Response, BufSocket5} = case Status of
1221            ?DCP_STATUS_OK ->
1222                {ok, BufSocket2};
1223            _ ->
1224                parse_error_response(BufSocket2, Timeout, BodyLength, Status)
1225            end,
1226            {done, {stream_response, RequestId, Response}, BufSocket5};
1227        {stats, Status, RequestId, BodyLength, KeyLength} ->
1228            case BodyLength of
1229            0 ->
1230                case Status of
1231                ?DCP_STATUS_OK ->
1232                    StatAcc = lists:reverse(MsgAcc0),
1233                    {done, {stream_response, RequestId, {ok, StatAcc}},
1234                        BufSocket2};
1235                % Some errors might not contain a body
1236                _ ->
1237                    Error = {error, {Status, <<>>}},
1238                    {done, {stream_response, RequestId, Error}, BufSocket2}
1239                end;
1240            _ ->
1241                case receive_stat(
1242                    BufSocket2, Timeout, Status, BodyLength, KeyLength) of
1243                {{ok, Stat}, BufSocket5} ->
1244                    {true, [Stat | MsgAcc0], BufSocket5};
1245                {error, _} = Error ->
1246                    {done, {stream_response, RequestId, Error}, BufSocket2}
1247                end
1248            end;
1249        {snapshot_marker, _PartId, RequestId, BodyLength} ->
1250            {ok, SnapshotMarker, BufSocket5} = receive_snapshot_marker(
1251                BufSocket2, Timeout, BodyLength),
1252            {done, {stream_event, RequestId,
1253                {snapshot_marker, SnapshotMarker, BodyLength}}, BufSocket5};
1254        {snapshot_mutation, PartId, RequestId, KeyLength, BodyLength,
1255                ExtraLength, Cas, DataType} ->
1256            {Mutation, BufSocket5} = receive_snapshot_mutation(
1257                BufSocket2, Timeout, PartId, KeyLength, BodyLength, ExtraLength,
1258                Cas, DataType),
1259            {done, {stream_event, RequestId,
1260                    {snapshot_mutation, Mutation, BodyLength}}, BufSocket5};
1261        % For the indexer and XDCR there's no difference between a deletion
1262        % end an expiration. In both cases the items should get removed.
1263        % Hence the same code can be used after the initial header
1264        % parsing (the body is the same).
1265        {OpCode, PartId, RequestId, KeyLength, BodyLength, Cas, DataType} when
1266                OpCode =:= snapshot_deletion orelse
1267                OpCode =:= snapshot_expiration ->
1268            {Deletion, BufSocket5} = receive_snapshot_deletion(
1269                BufSocket2, Timeout, PartId, KeyLength, BodyLength,
1270                Cas, DataType),
1271            {done, {stream_event, RequestId,
1272                {snapshot_deletion, Deletion, BodyLength}}, BufSocket5};
1273        {stream_end, PartId, RequestId, BodyLength} ->
1274            {Flag, BufSocket5} = receive_stream_end(BufSocket2,
1275                Timeout, BodyLength),
1276            {done, {stream_event, RequestId, {stream_end,
1277                {RequestId, PartId, Flag}, BodyLength}}, BufSocket5};
1278        {all_seqs, Status, RequestId, BodyLength} ->
1279            {Resp, BufSocket5} = receive_all_seqs(
1280                BufSocket2, Timeout, Status, BodyLength),
1281            {done, {stream_response, RequestId, Resp}, BufSocket5}
1282        end,
1283        case Action of
1284        done ->
1285            Parent ! MsgAcc,
1286            receive_worker(BufSocket3, Timeout, Parent, []);
1287        true ->
1288            receive_worker(BufSocket3, Timeout, Parent, MsgAcc)
1289        end;
1290    {error, Reason} ->
1291        exit({conn_error, Reason})
1292    end.
1293
1294% Check if we need to send buffer ack to server and send it
1295% if required.
1296-spec check_and_send_buffer_ack(#state{}, request_id(), tuple() | nil, atom()) ->
1297                        {ok, #state{}} | {error, closed | inet:posix()}.
1298check_and_send_buffer_ack(State, _RequestId, {error, _, _}, _Type) ->
1299    {ok, State};
1300
1301check_and_send_buffer_ack(State, RequestId, Event, Type) ->
1302    #state{
1303        bufsocket = BufSocket,
1304        max_buffer_size = MaxBufSize,
1305        stream_queues = StreamQueues,
1306        total_buffer_size = Size
1307    } = State,
1308    Size2 = case Type of
1309    remove_stream ->
1310        case dict:find(RequestId, StreamQueues) of
1311        error ->
1312            Size;
1313        {ok, {_, EvQueue}} ->
1314            get_queue_size(EvQueue, Size)
1315        end;
1316    mutation ->
1317        Size + get_event_size(Event)
1318    end,
1319    MaxAckSize = MaxBufSize * ?DCP_BUFFER_ACK_THRESHOLD,
1320    {Status, Ret} = if
1321    Size2 > MaxAckSize ->
1322        BufferAckRequest = couch_dcp_consumer:encode_buffer_request(0, Size2),
1323        case bufsocket_send(BufSocket, BufferAckRequest) of
1324        ok ->
1325            {ok, 0};
1326        Error ->
1327            {error, Error}
1328        end;
1329    Size2 == 0 ->
1330        {false, stream_not_found};
1331    true ->
1332        {ok, Size2}
1333    end,
1334    case Status of
1335    ok ->
1336        State2 = State#state{
1337            total_buffer_size = Ret
1338        },
1339        {ok, State2};
1340    error ->
1341        {error, Ret};
1342    false ->
1343        {ok, State}
1344    end.
1345
1346-spec send_buffer_ack(#state{}) ->
1347            {ok, #state{}} | {error, closed | inet:posix()}.
1348send_buffer_ack(State) ->
1349    #state{
1350        bufsocket = BufSocket,
1351        total_buffer_size = Size
1352    } = State,
1353    BufferAckRequest = couch_dcp_consumer:encode_buffer_request(0, Size),
1354    case bufsocket_send(BufSocket, BufferAckRequest) of
1355    ok ->
1356        {ok, State#state{total_buffer_size = 0}};
1357    {error, _Reason} = Error ->
1358        Error
1359    end.
1360
1361
1362-spec set_buffer_size(#state{}, non_neg_integer()) -> {ok ,#state{}} |
1363                                            {error, closed | inet:posix()}.
1364set_buffer_size(State, Size) ->
1365    #state{
1366        bufsocket = BufSocket,
1367        request_id = RequestId
1368    } = State,
1369    ControlRequest = couch_dcp_consumer:encode_control_request(RequestId, connection, Size),
1370    case bufsocket_send(BufSocket, ControlRequest) of
1371    ok ->
1372        State2 = next_request_id(State),
1373        State3 = add_pending_request(State2, RequestId, {control_request, Size}, nil),
1374        State4 = State3#state{max_buffer_size = Size},
1375        {ok, State4};
1376    {error, Error} ->
1377        {error, Error}
1378    end.
1379
1380-spec get_queue_size(queue(), non_neg_integer()) -> non_neg_integer().
1381get_queue_size(EvQueue, Size) ->
1382    case queue:out(EvQueue) of
1383    {empty, _} ->
1384        Size;
1385    {{value, Item}, NewQueue} ->
1386        Size2 = Size + get_event_size(Item),
1387        get_queue_size(NewQueue, Size2)
1388    end.
1389
1390-spec get_event_size({atom(), #dcp_doc{}, non_neg_integer()}) -> non_neg_integer().
1391get_event_size({_Type, _Doc, BodyLength}) ->
1392    ?DCP_HEADER_LEN + BodyLength.
1393
1394-spec remove_stream_info(partition_id(), #state{}) -> #state{}.
1395remove_stream_info(PartId, State) ->
1396    #state{
1397        active_streams = ActiveStreams,
1398        stream_info = StreamData
1399    } = State,
1400    case lists:keyfind(PartId, 1, ActiveStreams) of
1401    false ->
1402        State;
1403    {_, RequestId} ->
1404        StreamData2 = dict:erase(RequestId, StreamData),
1405        State#state{stream_info = StreamData2}
1406    end.
1407
1408-spec insert_stream_info(partition_id(), request_id(), uuid(), non_neg_integer(),
1409                        non_neg_integer(), #state{}, non_neg_integer()) -> #state{}.
1410insert_stream_info(PartId, RequestId, PartUuid, StartSeq, EndSeq, State, Flags) ->
1411    #state{stream_info = StreamData} = State,
1412    Data = #stream_info{
1413        part_id = PartId,
1414        part_uuid = PartUuid,
1415        start_seq = StartSeq,
1416        end_seq = EndSeq,
1417        flags = Flags
1418    },
1419    StreamData2 = dict:store(RequestId, Data, StreamData),
1420    State#state{stream_info = StreamData2}.
1421
1422-spec find_stream_info(request_id(), #state{}) -> #stream_info{} | nil.
1423find_stream_info(RequestId, State) ->
1424    #state{
1425       stream_info = StreamData
1426    } = State,
1427    case dict:find(RequestId, StreamData) of
1428    {ok, Info} ->
1429        Info;
1430    error ->
1431        nil
1432    end.
1433
1434-spec add_new_stream({partition_id(), uuid(), update_seq(), update_seq(),
1435        {update_seq(), update_seq()}, 0..255},
1436        {pid(), string()}, #state{}) -> #state{} | {error, closed | inet:posix()}.
1437add_new_stream({PartId, PartUuid, StartSeq, EndSeq,
1438    {SnapSeqStart, SnapSeqEnd}, Flags}, From, State) ->
1439   #state{
1440       bufsocket = BufSocket,
1441       request_id = RequestId
1442    } = State,
1443    StreamRequest = couch_dcp_consumer:encode_stream_request(
1444        PartId, RequestId, Flags, StartSeq, EndSeq, PartUuid, SnapSeqStart, SnapSeqEnd),
1445    case bufsocket_send(BufSocket, StreamRequest) of
1446    ok ->
1447        State2 = insert_stream_info(PartId, RequestId, PartUuid, StartSeq, EndSeq,
1448            State, Flags),
1449        State3 = next_request_id(State2),
1450        add_pending_request(State3, RequestId, {add_stream, PartId}, From);
1451    Error ->
1452        {error, Error}
1453    end.
1454
1455-spec restart_worker(#state{}) -> {noreply, #state{}} | {stop, sasl_auth_failed}.
1456restart_worker(State) ->
1457    #state{
1458        args = Args
1459    } = State,
1460    case init(Args) of
1461    {stop, Reason} ->
1462        {stop, Reason, State};
1463    {ok, State2} ->
1464        #state{
1465            bufsocket = BufSocket,
1466            worker_pid = WorkerPid
1467        } = State2,
1468        % Replace the socket
1469        State3 = State#state{
1470            bufsocket = BufSocket,
1471            pending_requests = dict:new(),
1472            worker_pid = WorkerPid
1473        },
1474        Error = {error, dcp_conn_closed},
1475        dict:map(fun(_RequestId, {ReqInfo, SendTo}) ->
1476            case ReqInfo of
1477            get_stats ->
1478                {MRef, From} = SendTo,
1479                From ! {get_stats, MRef, Error};
1480	    all_seqs ->
1481                {MRef, From} = SendTo,
1482                From ! {all_seqs, MRef, Error};
1483            {control_request, _} ->
1484                ok;
1485            _ ->
1486                gen_server:reply(SendTo, Error)
1487            end
1488        end, State#state.pending_requests),
1489        dict:map(fun(RequestId, _Value) ->
1490            Event = {error, dcp_conn_closed, 0},
1491            self() ! {stream_event, RequestId, Event}
1492        end, State#state.stream_info),
1493        {noreply, State3}
1494    end.
1495
1496-spec store_snapshot_seq(request_id(), {update_seq(), update_seq(),
1497                            non_neg_integer()}, #state{}) -> #state{}.
1498store_snapshot_seq(RequestId, Data, State) ->
1499    {StartSeq, EndSeq, _Type} = Data,
1500    #state{
1501        stream_info = StreamData
1502    } = State,
1503    case dict:find(RequestId, StreamData) of
1504    {ok, Val} ->
1505        Val2 = Val#stream_info{snapshot_seq = {StartSeq, EndSeq}},
1506        StreamData2 = dict:store(RequestId, Val2, StreamData),
1507        State#state{stream_info = StreamData2};
1508    error ->
1509        State
1510    end.
1511
1512-spec store_snapshot_mutation(request_id(), #dcp_doc{}, #state{}) -> #state{}.
1513store_snapshot_mutation(RequestId, Data, State) ->
1514  #dcp_doc{
1515        seq = Seq
1516    } = Data,
1517    #state{
1518        stream_info = StreamData
1519    } = State,
1520    case dict:find(RequestId, StreamData) of
1521    error ->
1522        State;
1523    {ok, Val} ->
1524        Val2 = Val#stream_info{start_seq = Seq},
1525        StreamData2 = dict:store(RequestId, Val2, StreamData),
1526        State#state{stream_info = StreamData2}
1527    end.
1528
1529-spec remove_body_len({atom(), tuple | #dcp_doc{}, non_neg_integer()}) ->
1530                                                {atom(), tuple | #dcp_doc{}}.
1531remove_body_len({Type, Data, _BodyLength}) ->
1532    {Type, Data}.
1533