1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_dcp_client).
14-behaviour(gen_server).
15
16% Public API
17-export([start/6]).
18-export([add_stream/6, get_seqs/2, get_num_items/2,
19    get_failover_log/2]).
20-export([get_stream_event/2, remove_stream/2, list_streams/1]).
21-export([enum_docs_since/8, restart_worker/1]).
22-export([get_seqs_async/1]).
23
24% gen_server callbacks
25-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
26-export([format_status/2]).
27
28-include("couch_db.hrl").
29-include_lib("couch_dcp/include/couch_dcp.hrl").
30-include_lib("couch_dcp/include/couch_dcp_typespecs.hrl").
31-define(MAX_BUF_SIZE, 10485760).
32-define(TIMEOUT, 60000).
33-define(TIMEOUT_STATS, 2000).
34-define(DCP_RETRY_TIMEOUT, 2000).
35-define(DCP_BUF_WINDOW, 65535).
36-define(SOCKET(BufSocket), BufSocket#bufsocket.sockpid).
37
38-type mutations_fold_fun() :: fun().
39-type mutations_fold_acc() :: any().
40
41-record(bufsocket, {
42    sockpid = nil                   :: socket() | nil,
43    sockbuf = <<>>                  :: binary()
44}).
45
46-record(state, {
47    bufsocket = nil                 :: #bufsocket{} | nil,
48    timeout = 5000                  :: timeout(),
49    request_id = 0                  :: request_id(),
50    pending_requests = dict:new()   :: dict(),
51    stream_queues = dict:new()      :: dict(),
52    active_streams = []             :: list(),
53    worker_pid                      :: pid(),
54    max_buffer_size = ?MAX_BUF_SIZE :: integer(),
55    total_buffer_size = 0           :: non_neg_integer(),
56    stream_info = dict:new()        :: dict(),
57    args = []                       :: list()
58}).
59
60-record(stream_info, {
61    part_id = 0                     :: partition_id(),
62    part_uuid = 0                   :: uuid(),
63    start_seq = 0                   :: update_seq(),
64    end_seq = 0                     :: update_seq(),
65    snapshot_seq = {0, 0}           :: {update_seq(), update_seq()},
66    flags = 0                       :: non_neg_integer()
67}).
68
69% This gen server implements a DCP client with vbucket stream multiplexing
70% The client spawns a worker process to handle all response messages and event
71% messages received from the DCP server. For easiness, responses are classifed into
72% two types of messages, stream_response and stream_event. For any DCP request, the
73% corresponding response message is called stream_response. But a vbucket stream request
74% is like a subscribe mechanism. After the stream initated response arrives, it will start
75% sending vbucket events. This type of messages are called as stream_event.
76%
77% The user can request for a stream using add_stream() API and corresponding events can
78% be received by using get_stream_event() API.
79
80
81% Public API
82
83-spec start(binary(), binary(), binary(), binary(), non_neg_integer(), non_neg_integer()) ->
84    {ok, pid()} | ignore | {error, {already_started, pid()} | term()}.
85start(Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags) ->
86    gen_server:start_link(?MODULE, [Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags], []).
87
88-spec add_stream(pid(), partition_id(), uuid(), update_seq(),
89    update_seq(), dcp_data_type()) -> {error, term()} | {request_id(), term()}.
90add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
91    gen_server:call(
92        Pid, {add_stream, PartId, PartUuid, StartSeq, EndSeq, Flags}, ?TIMEOUT).
93
94
95-spec remove_stream(pid(), partition_id()) ->
96                            'ok' | {'error', term()}.
97remove_stream(Pid, PartId) ->
98    gen_server:call(Pid, {remove_stream, PartId}, ?TIMEOUT).
99
100
101-spec list_streams(pid()) -> list().
102list_streams(Pid) ->
103    gen_server:call(Pid, list_streams).
104
105
106-spec get_stats_reply(pid(), reference()) -> term().
107get_stats_reply(Pid, MRef) ->
108    receive
109    {get_stats, MRef, Reply} ->
110        Reply;
111    {'DOWN', MRef, process, Pid, Reason} ->
112        exit({dcp_client_died, Pid, Reason})
113    after ?TIMEOUT_STATS ->
114        ?LOG_ERROR("dcp client (~p): stats timed out after ~p seconds."
115                   " Waiting...",
116            [Pid, ?TIMEOUT_STATS / 1000]),
117        get_stats_reply(Pid, MRef)
118    end.
119
120-spec get_all_seqs_reply(pid(), reference()) -> term().
121get_all_seqs_reply(Pid, MRef) ->
122    receive
123    {all_seqs, MRef, Reply} ->
124        Reply;
125    {'DOWN', MRef, process, Pid, Reason} ->
126        exit({dcp_client_died, Pid, Reason})
127    after ?TIMEOUT_STATS ->
128        ?LOG_ERROR("dcp client (~p): dcp all seqs timed out after ~p seconds."
129                   " Waiting...",
130            [Pid, ?TIMEOUT_STATS / 1000]),
131        get_all_seqs_reply(Pid, MRef)
132    end.
133
134-spec get_stats(pid(), binary(), partition_id() | nil) -> term().
135get_stats(Pid, Name, PartId) ->
136    MRef = erlang:monitor(process, Pid),
137    Pid ! {get_stats, Name, PartId, {MRef, self()}},
138    Reply = get_stats_reply(Pid, MRef),
139    erlang:demonitor(MRef, [flush]),
140    Reply.
141
142
143% The async get_seqs API requests seq numbers asynchronously.
144% The response will be sent to the caller process asynchronously in the
145% following message format:
146% {all_seqs, nil, StatsResponse}.
147-spec get_seqs_async(pid()) -> ok.
148get_seqs_async(Pid) ->
149    Pid ! {get_all_seqs, {nil, self()}},
150    ok.
151
152-spec get_seqs(pid(), ordsets:ordset(partition_id()) | nil) ->
153         {ok, partition_seqs()} | {error, term()}.
154get_seqs(Pid, SortedPartIds) ->
155    case get_all_seqs(Pid) of
156    {ok, Seqs} ->
157        case SortedPartIds of
158        nil ->
159            {ok, Seqs};
160        _ ->
161            Seqs2 = couch_set_view_util:filter_seqs(SortedPartIds, Seqs),
162            {ok, Seqs2}
163        end;
164    {error, _} = Error ->
165        Error
166    end.
167
168-spec get_all_seqs(pid()) -> term().
169get_all_seqs(Pid) ->
170    MRef = erlang:monitor(process, Pid),
171    Pid ! {get_all_seqs, {MRef, self()}},
172    Reply = get_all_seqs_reply(Pid, MRef),
173    erlang:demonitor(MRef, [flush]),
174    Reply.
175
176-spec get_num_items(pid(), partition_id()) ->
177                           {ok, non_neg_integer()} | {error, not_my_vbucket}.
178get_num_items(Pid, PartId) ->
179    Reply = get_stats(Pid, <<"vbucket-details">>, PartId),
180    case Reply of
181    {ok, Stats} ->
182        BinPartId = list_to_binary(integer_to_list(PartId)),
183        {_, NumItems} = lists:keyfind(
184            <<"vb_", BinPartId/binary, ":num_items">>, 1, Stats),
185        {ok, list_to_integer(binary_to_list(NumItems))};
186    {error, {?DCP_STATUS_NOT_MY_VBUCKET, _}} ->
187        {error, not_my_vbucket}
188    end.
189
190
191-spec get_failover_log(pid(), partition_id()) ->
192                              {error, no_failover_log_found | dcp_status()} |
193                              {ok, partition_version()}.
194get_failover_log(Pid, PartId) ->
195    gen_server:call(Pid, {get_failover_log, PartId}).
196
197
198-spec get_stream_event(pid(), request_id()) ->
199                              {atom(), #dcp_doc{}} | {'error', term()}.
200get_stream_event(Pid, ReqId) ->
201    MRef = erlang:monitor(process, Pid),
202    Pid ! {get_stream_event, ReqId, self()},
203    Reply = get_stream_event_get_reply(Pid, ReqId, MRef),
204    erlang:demonitor(MRef, [flush]),
205    Reply.
206
207get_stream_event_get_reply(Pid, ReqId, MRef) ->
208    receive
209    {stream_event, ReqId, Reply} ->
210        Reply;
211    {'DOWN', MRef, process, Pid, Reason} ->
212        exit({dcp_client_died, Pid, Reason})
213    after ?TIMEOUT ->
214        Msg = {print_log, ReqId},
215        Pid ! Msg,
216        get_stream_event_get_reply(Pid, ReqId, MRef)
217    end.
218
219-spec enum_docs_since(pid(), partition_id(), partition_version(), update_seq(),
220                      update_seq(), 0..255, mutations_fold_fun(),
221                      mutations_fold_acc()) ->
222                             {error, vbucket_stream_not_found |
223                              wrong_start_sequence_number |
224                              too_large_failover_log } |
225                             {rollback, update_seq()} |
226                             {ok, mutations_fold_acc(), partition_version()}.
227enum_docs_since(_, _, [], _, _, _, _, _) ->
228    % No matching partition version found. Recreate the index from scratch
229    {rollback, 0};
230enum_docs_since(Pid, PartId, PartVersions, StartSeq, EndSeq0, Flags,
231        CallbackFn, InAcc) ->
232    [PartVersion | PartVersionsRest] = PartVersions,
233    {PartUuid, _} = PartVersion,
234    EndSeq = case EndSeq0 < StartSeq of
235    true ->
236        ?LOG_INFO("dcp client (~p): Expecting a rollback for partition ~p. "
237        "Found start_seqno > end_seqno (~p > ~p).",
238        [Pid, PartId, StartSeq, EndSeq0]),
239        StartSeq;
240    false ->
241        EndSeq0
242    end,
243
244    case add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) of
245    {error, _} = Error ->
246        Error;
247    {RequestId, Resp} ->
248        case Resp of
249        {failoverlog, FailoverLog} ->
250            case length(FailoverLog) > ?DCP_MAX_FAILOVER_LOG_SIZE of
251            true ->
252                {error, too_large_failover_log};
253            false ->
254                InAcc2 = CallbackFn({part_versions, {PartId, FailoverLog}}, InAcc),
255                case receive_events(Pid, RequestId, CallbackFn, InAcc2) of
256                {ok, InAcc3} ->
257                    {ok, InAcc3, FailoverLog};
258                Error ->
259                    Error
260                end
261            end;
262        {error, vbucket_stream_not_found} ->
263            enum_docs_since(Pid, PartId, PartVersionsRest, StartSeq, EndSeq,
264                Flags, CallbackFn, InAcc);
265        {error, vbucket_stream_tmp_fail} ->
266            ?LOG_INFO("dcp client (~p): Temporary failure on stream request "
267                "on partition ~p. Retrying...", [Pid, PartId]),
268            timer:sleep(100),
269            enum_docs_since(Pid, PartId, PartVersions, StartSeq, EndSeq,
270                Flags, CallbackFn, InAcc);
271        _ ->
272            Resp
273        end
274    end.
275
276
277% gen_server callbacks
278
279-spec init([binary() | non_neg_integer()]) -> {ok, #state{}} |
280                    {stop, sasl_auth_failed | closed | inet:posix()}.
281init([Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags]) ->
282    DcpTimeout = list_to_integer(
283        couch_config:get("dcp", "connection_timeout")),
284    DcpPort = list_to_integer(couch_config:get("dcp", "port")),
285
286    case gen_tcp:connect("localhost", DcpPort,
287        [binary, {packet, raw}, {active, true}, {nodelay, true},
288                 {buffer, ?DCP_BUF_WINDOW}], DcpTimeout) of
289    {ok, SockPid} ->
290        BufSocket = #bufsocket{sockpid = SockPid, sockbuf = <<>>},
291        State = #state{
292            bufsocket = BufSocket,
293            timeout = DcpTimeout,
294            request_id = 0
295        },
296        % Auth as admin and select bucket for the connection
297        case sasl_auth(AdmUser, AdmPasswd, State) of
298        {ok, State2} ->
299            case select_bucket(Bucket, State2) of
300            {ok, State3} ->
301                % Store the meta information to reconnect
302                Args = [Name, Bucket, AdmUser, AdmPasswd, BufferSize, Flags],
303                State4 = State3#state{args = Args},
304                case open_connection(Name, Flags, State4) of
305                {ok, State5} ->
306                    Parent = self(),
307                    process_flag(trap_exit, true),
308                    #state{bufsocket = BufSocket2} = State5,
309                    WorkerPid = spawn_link(
310                        fun() ->
311                            receive_worker(BufSocket2, DcpTimeout, Parent, [])
312                        end),
313                    gen_tcp:controlling_process(?SOCKET(BufSocket), WorkerPid),
314                    case set_buffer_size(State5, BufferSize) of
315                    {ok, State6} ->
316                        {ok, State6#state{worker_pid = WorkerPid}};
317                    {error, Reason} ->
318                        exit(WorkerPid, shutdown),
319                        {stop, Reason}
320                    end;
321                {error, Reason} ->
322                    {stop, Reason}
323                end;
324            {error, Reason} ->
325                {stop, Reason}
326            end;
327        {error, Reason} ->
328            {stop, Reason}
329        end;
330    {error, Reason} ->
331        {stop, {error, {dcp_socket_connect_failed, Reason}}}
332    end.
333
334
335% Add caller to the request queue and wait for gen_server to reply on response arrival
336handle_call({add_stream, PartId, PartUuid, StartSeq, EndSeq, Flags},
337        From, State) ->
338    SnapshotStart = StartSeq,
339    SnapshotEnd = StartSeq,
340    case add_new_stream({PartId, PartUuid, StartSeq, EndSeq,
341        {SnapshotStart, SnapshotEnd}, Flags}, From, State) of
342    {error, Reason} ->
343        {reply, {error, Reason}, State};
344    State2 ->
345        {noreply, State2}
346    end;
347
348handle_call({remove_stream, PartId}, From, State) ->
349    State2 = remove_stream_info(PartId, State),
350    #state{
351       request_id = RequestId,
352       bufsocket = BufSocket
353    } = State2,
354    StreamCloseRequest = couch_dcp_consumer:encode_stream_close(
355        PartId, RequestId),
356    case bufsocket_send(BufSocket, StreamCloseRequest) of
357    ok ->
358        State3 = next_request_id(State2),
359        State4 = add_pending_request(State3, RequestId, {remove_stream, PartId}, From),
360        {noreply, State4};
361    {error, _Reason} = Error ->
362        {reply, Error, State2}
363    end;
364
365handle_call(list_streams, _From, State) ->
366    #state{
367       active_streams = ActiveStreams
368    } = State,
369    Reply = lists:foldl(fun({PartId, _, _}, Acc) -> [PartId | Acc] end, [], ActiveStreams),
370    {reply, Reply, State};
371
372handle_call({get_failover_log, PartId}, From, State) ->
373    #state{
374       request_id = RequestId,
375       bufsocket = BufSocket
376    } = State,
377    FailoverLogRequest = couch_dcp_consumer:encode_failover_log_request(
378        PartId, RequestId),
379    case bufsocket_send(BufSocket, FailoverLogRequest) of
380    ok ->
381        State2 = next_request_id(State),
382        State3 = add_pending_request(State2, RequestId, get_failover_log, From),
383        {noreply, State3};
384    Error ->
385        {reply, Error, State}
386    end;
387
388% Only used by unit test
389handle_call(get_buffer_size, _From, #state{total_buffer_size = Size} = State) ->
390    {reply, Size, State};
391
392% Only used by unit test
393handle_call(reset_buffer_size, _From, #state{total_buffer_size = Size} = State) ->
394    State2 = State#state{total_buffer_size = 0},
395    {reply, Size, State2};
396
397% Only used by unit test
398handle_call(flush_old_streams_meta, _From, State) ->
399    {reply, ok, State#state{stream_info = dict:new()}};
400
401% Only used by unit test
402handle_call(get_socket, _From, #state{bufsocket = BufSocket} = State) ->
403    {reply, BufSocket, State};
404
405% Only used by unit test
406handle_call({get_buffer_size, RequestId}, _From,
407        #state{stream_queues = StreamQueues} = State) ->
408    {ok, {_, EvQueue}} = dict:find(RequestId, StreamQueues),
409    Size = get_queue_size(EvQueue, 0),
410    {reply, Size, State};
411
412% Only used by unit test
413handle_call({get_event_size, Event}, _From, State) ->
414    Size = get_event_size(Event),
415    {reply, Size, State}.
416
417% If a stream event for this requestId is present in the queue,
418% dequeue it and reply back to the caller.
419% Else, put the caller into the stream queue waiter list
420handle_info({get_stream_event, RequestId, From}, State) ->
421    case stream_event_present(State, RequestId) of
422    true ->
423        Event = peek_stream_event(State, RequestId),
424        {Optype, _, _} = Event,
425        {Msg, State6} = case check_and_send_buffer_ack(State, RequestId, Event, mutation) of
426        {ok, State3} ->
427            {State4, Event} = dequeue_stream_event(State3, RequestId),
428            State5 = case Optype =:= stream_end orelse Optype =:= error of
429            true ->
430                remove_request_queue(State4, RequestId);
431            _ ->
432                State4
433            end,
434            {remove_body_len(Event), State5};
435        {error, Reason} ->
436            {{error, Reason}, State}
437        end,
438        From ! {stream_event, RequestId, Msg},
439        {noreply, State6};
440    false ->
441        case add_stream_event_waiter(State, RequestId, From) of
442        nil ->
443            Reply = {error, event_request_already_exists},
444            From ! {stream_event, RequestId, Reply},
445            {noreply, State};
446        State2 ->
447            {noreply, State2}
448        end;
449    nil ->
450        Reply = {error, vbucket_stream_not_found},
451        From ! {stream_event, RequestId, Reply},
452        {noreply, State}
453    end;
454
455
456handle_info({get_stats, Stat, PartId, From}, State) ->
457    #state{
458       request_id = RequestId,
459       bufsocket = BufSocket
460    } = State,
461    SeqStatRequest = couch_dcp_consumer:encode_stat_request(
462        Stat, PartId, RequestId),
463    case bufsocket_send(BufSocket, SeqStatRequest) of
464    ok ->
465        State2 = next_request_id(State),
466        State3 = add_pending_request(State2, RequestId, get_stats, From),
467        {noreply, State3};
468    Error ->
469        {reply, Error, State}
470    end;
471
472handle_info({get_all_seqs, From}, State) ->
473    #state{
474       request_id = RequestId,
475       bufsocket = BufSocket
476    } = State,
477    SeqStatRequest = couch_dcp_consumer:encode_all_seqs_request(RequestId),
478    case bufsocket_send(BufSocket, SeqStatRequest) of
479    ok ->
480        State2 = next_request_id(State),
481        State3 = add_pending_request(State2, RequestId, all_seqs, From),
482        {noreply, State3};
483    Error ->
484        {reply, Error, State}
485    end;
486
487% Handle response message send by connection receiver worker
488% Reply back to waiting callers
489handle_info({stream_response, RequestId, Msg}, State) ->
490    State3 = case find_pending_request(State, RequestId) of
491    {ReqInfo, SendTo} ->
492        State2 = case ReqInfo of
493        {add_stream, PartId} ->
494            {SendToPid, _} = SendTo,
495            gen_server:reply(SendTo, Msg),
496            case Msg of
497            {_, {failoverlog, _}} ->
498                add_request_queue(State, PartId, RequestId, SendToPid);
499            _ ->
500                % Remove possible leak in stream_info
501                remove_stream_info_by_reqid(RequestId, State)
502            end;
503        {remove_stream, PartId} ->
504            gen_server:reply(SendTo, Msg),
505            StreamReqId = find_stream_req_id(State, PartId),
506            case Msg of
507            ok ->
508                remove_request_queue(State, StreamReqId);
509            {error, vbucket_stream_not_found} ->
510                remove_request_queue(State, StreamReqId)
511            end;
512        % Server sent the response for the internal control request
513        {control_request, Size} ->
514            State#state{max_buffer_size = Size};
515        get_stats ->
516            {MRef, From} = SendTo,
517            From ! {get_stats, MRef, Msg},
518            State;
519        all_seqs ->
520            {MRef, From} = SendTo,
521            From ! {all_seqs, MRef, Msg},
522            State;
523        _ ->
524            gen_server:reply(SendTo, Msg),
525            State
526        end,
527        remove_pending_request(State2, RequestId);
528    nil ->
529        State
530    end,
531    {noreply, State3};
532
533% Respond with the no op message reply to server
534handle_info({stream_noop, RequestId}, State) ->
535    #state {
536        bufsocket = BufSocket
537    } = State,
538    NoOpResponse = couch_dcp_consumer:encode_noop_response(RequestId),
539    % if noop reponse fails two times, server it self will close the connection
540    bufsocket_send(BufSocket, NoOpResponse),
541    {noreply, State};
542
543% Handle events send by connection receiver worker
544% If there is a waiting caller for stream event, reply to them
545% Else, queue the event into the stream queue
546handle_info({stream_event, RequestId, Event}, State) ->
547    {Optype, Data, _Length} = Event,
548    State2 = case Optype of
549    Optype when Optype =:= stream_end orelse Optype =:= error ->
550        #state{
551            stream_info = StreamData
552        } = State,
553        StreamData2 = dict:erase(RequestId, StreamData),
554        State#state{stream_info = StreamData2};
555    snapshot_marker ->
556        store_snapshot_seq(RequestId, Data, State);
557    snapshot_mutation ->
558        store_snapshot_mutation(RequestId, Data, State);
559    snapshot_deletion ->
560        store_snapshot_mutation(RequestId, Data, State)
561    end,
562    case stream_event_waiters_present(State2, RequestId) of
563    true ->
564        {State3, Waiter} = remove_stream_event_waiter(State2, RequestId),
565        {Msg, State6} = case check_and_send_buffer_ack(State3, RequestId, Event, mutation) of
566        {ok, State4} ->
567            State5 = case Optype =:= stream_end orelse Optype =:= error of
568            true ->
569                remove_request_queue(State4, RequestId);
570            _ ->
571                State4
572            end,
573            {remove_body_len(Event), State5};
574        {error, Reason} ->
575            State4 = enqueue_stream_event(State3, RequestId, Event),
576            {{error, Reason}, State4}
577        end,
578        Waiter ! {stream_event, RequestId, Msg},
579        {noreply, State6};
580    false ->
581        State3 = enqueue_stream_event(State2, RequestId, Event),
582        {noreply, State3};
583    nil ->
584        % We might have explicitly closed a stream using close_stream command
585        % Before the server received close_stream message, it would have placed
586        % some mutations in the network buffer queue. We still need to acknowledge
587        % the mutations received.
588        {ok, State3} = check_and_send_buffer_ack(State, RequestId, Event, mutation),
589        {noreply, State3}
590    end;
591
592handle_info({'EXIT', Pid, {conn_error, Reason}}, #state{worker_pid = Pid} = State) ->
593    [Name, Bucket, _AdmUser, _AdmPasswd, _BufferSize, _Flags] = State#state.args,
594    ?LOG_ERROR("dcp client (~s, ~s): dcp receive worker failed due to reason: ~p."
595        " Restarting dcp receive worker...",
596        [Bucket, Name, Reason]),
597    timer:sleep(?DCP_RETRY_TIMEOUT),
598    restart_worker(State);
599
600handle_info({'EXIT', Pid, Reason}, #state{worker_pid = Pid} = State) ->
601    {stop, Reason, State};
602
603handle_info({print_log, ReqId}, State) ->
604    [Name, Bucket, _AdmUser, _AdmPasswd, _BufferSize, _Flags] = State#state.args,
605    case find_stream_info(ReqId, State) of
606    nil ->
607        ?LOG_ERROR(
608            "dcp client (~s, ~s): Obtaining message from server timed out "
609            "after ~p seconds [RequestId ~p]. Waiting...",
610            [Bucket, Name, ?TIMEOUT / 1000, ReqId]);
611    StreamInfo ->
612        #stream_info{
613           start_seq = Start,
614           end_seq = End,
615           part_id = PartId
616        } = StreamInfo,
617        ?LOG_ERROR("dcp client (~s, ~s): Obtaining mutation from server timed out "
618            "after ~p seconds [RequestId ~p, PartId ~p, StartSeq ~p, EndSeq ~p]. Waiting...",
619            [Bucket, Name, ?TIMEOUT / 1000, ReqId, PartId, Start, End])
620    end,
621    {noreply, State};
622
623handle_info({'DOWN', MRef, process, _Pid, _Reason},
624            #state{active_streams = ActiveStreams} = State) ->
625    {_PartId, RequestId, MRef} = lists:keyfind(MRef, 3, ActiveStreams),
626    State2 = remove_stream_info_by_reqid(RequestId, State),
627    State3 = remove_request_queue(State2, RequestId),
628    {noreply, State3};
629
630handle_info(Msg, State) ->
631    {stop, {unexpected_info, Msg}, State}.
632
633-spec handle_cast(any(), #state{}) ->
634                         {stop, {unexpected_cast, any()}, #state{}}.
635handle_cast(Msg, State) ->
636    {stop, {unexpected_cast, Msg}, State}.
637
638
639-spec terminate(any(), #state{}) -> ok.
640terminate(_Reason, #state{worker_pid = Pid}) ->
641    exit(Pid, shutdown),
642    ok.
643
644
645-spec code_change(any(), #state{}, any()) -> {ok, #state{}}.
646code_change(_OldVsn, State, _Extra) ->
647    {ok, State}.
648
649
650format_status(_Opt, [_PDict, #state{stream_queues = StreamQueues} = State]) ->
651    TransformFn = fun(_Key, {Waiter, Queue}) ->
652                     {Waiter, {queue:len(Queue), queue:peek_r(Queue)}}
653                  end,
654    State#state{stream_queues = dict:map(TransformFn, StreamQueues)}.
655
656
657% Internal functions
658
659-spec sasl_auth(binary(), binary(), #state{}) -> {ok, #state{}} |
660                            {error, sasl_auth_failed | closed | inet:posix()}.
661sasl_auth(User, Passwd, State) ->
662    #state{
663        bufsocket = BufSocket,
664        timeout = DcpTimeout,
665        request_id = RequestId
666    } = State,
667    Authenticate = couch_dcp_consumer:encode_sasl_auth(User, Passwd, RequestId),
668    case bufsocket_send(BufSocket, Authenticate) of
669    ok ->
670        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
671        {ok, Header, BufSocket2} ->
672            {sasl_auth, Status, RequestId, BodyLength} =
673                couch_dcp_consumer:parse_header(Header),
674            % Receive the body so that it is not mangled with the next request,
675            % we care about the status only though
676            case bufsocket_recv(BufSocket2, BodyLength, DcpTimeout) of
677            {ok, _, BufSocket3} ->
678                case Status of
679                ?DCP_STATUS_OK ->
680                    {ok, State#state{
681                        request_id = RequestId + 1,
682                        bufsocket = BufSocket3
683                    }};
684                ?DCP_STATUS_SASL_AUTH_FAILED ->
685                    {error, sasl_auth_failed}
686                end;
687            {error, _} = Error ->
688                Error
689            end;
690        {error, _} = Error ->
691            Error
692        end;
693    {error, _} = Error ->
694        Error
695    end.
696
697-spec select_bucket(binary(), #state{}) -> {ok, #state{}} | {error, term()}.
698select_bucket(Bucket, State) ->
699    #state{
700        bufsocket = BufSocket,
701        timeout = DcpTimeout,
702        request_id = RequestId
703    } = State,
704    SelectBucket = couch_dcp_consumer:encode_select_bucket(Bucket, RequestId),
705    case bufsocket_send(BufSocket, SelectBucket) of
706    ok ->
707        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
708        {ok, Header, BufSocket2} ->
709            {select_bucket, Status, RequestId, BodyLength} =
710                                    couch_dcp_consumer:parse_header(Header),
711            case Status of
712            ?DCP_STATUS_OK ->
713                {ok, State#state{
714                    request_id = RequestId + 1,
715                    bufsocket = BufSocket2
716                }};
717            _ ->
718                case parse_error_response(
719                        BufSocket2, DcpTimeout, BodyLength, Status) of
720                % When the authentication happened with bucket name and
721                % password, then the correct bucket is already selected. In
722                % this case a select bucket command returns "not supported".
723                {{error, not_supported}, BufSocket3} ->
724                    {ok, State#state{
725                        request_id = RequestId + 1,
726                        bufsocket = BufSocket3
727                    }};
728                {{error, _}, _} = Error ->
729                    Error
730                end
731            end;
732        {error, _} = Error ->
733            Error
734        end;
735    {error, _} = Error ->
736        Error
737    end.
738
739-spec open_connection(binary(), non_neg_integer(), #state{}) ->
740    {ok, #state{}} | {error, term()}.
741open_connection(Name, Flags, State) ->
742    #state{
743        bufsocket = BufSocket,
744        timeout = DcpTimeout,
745        request_id = RequestId
746    } = State,
747    OpenConnection = couch_dcp_consumer:encode_open_connection(
748        Name, Flags, RequestId),
749    case bufsocket_send(BufSocket, OpenConnection) of
750    ok ->
751        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
752        {ok, Header, BufSocket2} ->
753            {open_connection, RequestId} = couch_dcp_consumer:parse_header(Header),
754            State2 = State#state{bufsocket = BufSocket2},
755            {ok, next_request_id(State2)};
756        {error, _} = Error ->
757            Error
758        end;
759    {error, _} = Error ->
760        Error
761    end.
762
763
764-spec receive_snapshot_marker(#bufsocket{}, timeout(),  size()) ->
765                                     {ok, {update_seq(), update_seq(),
766                                           non_neg_integer()}, #bufsocket{}} |
767                                     {error, closed}.
768receive_snapshot_marker(BufSocket, Timeout, BodyLength) ->
769    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
770    {ok, Body, BufSocket2} ->
771         {snapshot_marker, StartSeq, EndSeq, Type} =
772             couch_dcp_consumer:parse_snapshot_marker(Body),
773         {ok, {StartSeq, EndSeq, Type}, BufSocket2};
774    {error, _} = Error ->
775        Error
776    end.
777
778-spec receive_snapshot_mutation(#bufsocket{}, timeout(), partition_id(), size(),
779                                size(), size(), uint64(), dcp_data_type()) ->
780                                {#dcp_doc{}, #bufsocket{}} | {error, closed}.
781receive_snapshot_mutation(BufSocket, Timeout, PartId, KeyLength, BodyLength,
782        ExtraLength, Cas, DataType) ->
783    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
784    {ok, Body, BufSocket2} ->
785         {snapshot_mutation, Mutation} =
786             couch_dcp_consumer:parse_snapshot_mutation(KeyLength, Body,
787                 BodyLength, ExtraLength),
788         #mutation{
789             seq = Seq,
790             rev_seq = RevSeq,
791             flags = Flags,
792             expiration = Expiration,
793             key = Key,
794             value = Value
795         } = Mutation,
796         {#dcp_doc{
797             id = Key,
798             body = Value,
799             data_type = DataType,
800             partition = PartId,
801             cas = Cas,
802             rev_seq = RevSeq,
803             seq = Seq,
804             flags = Flags,
805             expiration = Expiration,
806             deleted = false
807         }, BufSocket2};
808    {error, _} = Error ->
809        Error
810    end.
811
812-spec receive_snapshot_deletion(#bufsocket{}, timeout(), partition_id(), size(),
813                                size(), uint64(), dcp_data_type()) ->
814                                        {#dcp_doc{}, #bufsocket{}} |
815                                        {error, closed | inet:posix()}.
816receive_snapshot_deletion(BufSocket, Timeout, PartId, KeyLength, BodyLength,
817        Cas, DataType) ->
818    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
819    {ok, Body, BufSocket2} ->
820         {snapshot_deletion, Deletion} =
821             couch_dcp_consumer:parse_snapshot_deletion(KeyLength, Body),
822         {Seq, RevSeq, Key, _Metadata, XATTRs} = Deletion,
823         {#dcp_doc{
824             id = Key,
825             body = XATTRs,
826             data_type = DataType,
827             partition = PartId,
828             cas = Cas,
829             rev_seq = RevSeq,
830             seq = Seq,
831             flags = 0,
832             expiration = 0,
833             deleted = true
834         }, BufSocket2};
835    {error, Reason} ->
836        {error, Reason}
837    end.
838
839-spec receive_stream_end(#bufsocket{}, timeout(), size()) ->
840            {<<_:32>>, #bufsocket{}} | {error, closed | inet:posix()}.
841receive_stream_end(BufSocket, Timeout, BodyLength) ->
842    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
843    {ok, Flag, BufSocket2} ->
844        {Flag, BufSocket2};
845    {error, Reason} ->
846        {error, Reason}
847    end.
848
849
850% Returns the failover log as a list 2-tuple pairs with
851% partition UUID and sequence number
852-spec receive_failover_log(#bufsocket{}, timeout(), char(), size()) ->
853        {{'ok', list(partition_version())}, #bufsocket{}} | {error, closed | inet:posix()}.
854receive_failover_log(_BufSocket, _Timeout, _Status, 0) ->
855    {error, no_failover_log_found};
856receive_failover_log(BufSocket, Timeout, Status, BodyLength) ->
857    case Status of
858    ?DCP_STATUS_OK ->
859        case bufsocket_recv(BufSocket, BodyLength, Timeout) of
860        {ok, Body, BufSocket2} ->
861            {couch_dcp_consumer:parse_failover_log(Body), BufSocket2};
862        {error, _} = Error->
863            Error
864        end;
865    _ ->
866        {error, Status}
867    end.
868
869-spec receive_rollback_seq(#bufsocket{}, timeout(), size()) ->
870        {ok, update_seq(), #bufsocket{}} | {error, closed | inet:posix()}.
871receive_rollback_seq(BufSocket, Timeout, BodyLength) ->
872    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
873    {ok, <<RollbackSeq:?DCP_SIZES_BY_SEQ>>, BufSocket2} ->
874        {ok, RollbackSeq, BufSocket2};
875    {error, _} = Error->
876        Error
877    end.
878
879
880-spec receive_stat(#bufsocket{}, timeout(), dcp_status(), size(), size()) ->
881        {ok, {binary(), binary()} |
882        {ok, {binary(), binary()}, #bufsocket{}} |
883        {error, {dcp_status(), binary()}}} |
884        {error, closed}.
885receive_stat(BufSocket, Timeout, Status, BodyLength, KeyLength) ->
886    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
887    {ok, Body, BufSocket2} ->
888        {couch_dcp_consumer:parse_stat(
889            Body, Status, KeyLength, BodyLength - KeyLength), BufSocket2};
890    {error, Reason} ->
891        {error, Reason}
892    end.
893
894-spec receive_all_seqs(#bufsocket{}, timeout(), dcp_status(), size()) ->
895        {ok, list()} |
896        {error, {dcp_status(), binary()}} |
897        {error, closed}.
898receive_all_seqs(BufSocket, Timeout, Status, BodyLength) ->
899    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
900    {ok, Body, BufSocket2} ->
901        {couch_dcp_consumer:parse_all_seqs(Status, Body, []), BufSocket2};
902    {error, Reason} ->
903        {error, Reason}
904    end.
905
906-spec receive_events(pid(), request_id(), mutations_fold_fun(),
907                     mutations_fold_acc()) -> {ok, mutations_fold_acc()} |
908                                              {error, term()}.
909receive_events(Pid, RequestId, CallbackFn, InAcc) ->
910    {Optype, Data} = get_stream_event(Pid, RequestId),
911    case Optype of
912    stream_end ->
913        {ok, InAcc};
914    snapshot_marker ->
915        InAcc2 = CallbackFn({snapshot_marker, Data}, InAcc),
916        receive_events(Pid, RequestId, CallbackFn, InAcc2);
917    error ->
918        {error, Data};
919    _ ->
920        InAcc2 = CallbackFn(Data, InAcc),
921        receive_events(Pid, RequestId, CallbackFn, InAcc2)
922    end.
923
924-spec socket_send(socket(), iodata()) ->
925        ok | {error, closed | inet:posix()}.
926socket_send(Socket, Packet) ->
927    gen_tcp:send(Socket, Packet).
928
929-spec bufsocket_send(#bufsocket{}, iodata()) ->
930        ok | {error, closed | inet:posix()}.
931bufsocket_send(BufSocket, Packet) ->
932    socket_send(?SOCKET(BufSocket), Packet).
933
934-spec socket_recv(socket(), timeout()) ->
935        {ok, binary()} | {error, closed | inet:posix()}.
936socket_recv(SockPid, Timeout) ->
937    receive
938    {tcp, SockPid, Data} ->
939        {ok, Data};
940    {tcp_closed, SockPid} ->
941        {error, closed};
942    {tcp_error, SockPid, Reason} ->
943        {error, Reason}
944    after Timeout ->
945        {ok, <<>>}
946    end.
947
948-spec bufsocket_recv(#bufsocket{}, size(), timeout()) ->
949        {ok, binary(), #bufsocket{}} | {error, closed | inet:posix()}.
950bufsocket_recv(BufSocket, 0, _Timeout) ->
951    {ok, <<>>, BufSocket};
952bufsocket_recv(BufSocket, Length, Timeout) ->
953    #bufsocket{sockbuf = SockBuf} =  BufSocket,
954    case erlang:byte_size(SockBuf) >= Length of
955    true ->
956        <<Head:Length/binary, Tail/binary>> = SockBuf,
957        BufSocket2 = BufSocket#bufsocket{sockbuf = Tail},
958        {ok, Head, BufSocket2};
959    false ->
960        case socket_recv(?SOCKET(BufSocket), Timeout) of
961        {ok, Data} ->
962            Buf = <<SockBuf/binary, Data/binary>>,
963            BufSocket2 = BufSocket#bufsocket{sockbuf = Buf},
964            bufsocket_recv(BufSocket2, Length, Timeout);
965        {error, Reason} ->
966            {error, Reason}
967        end
968    end.
969
970-spec add_pending_request(#state{}, request_id(), term(), nil | {pid(), term()}) -> #state{}.
971add_pending_request(State, RequestId, ReqInfo, From) ->
972    #state{
973       pending_requests = PendingRequests
974    } = State,
975    PendingRequests2 = dict:store(RequestId, {ReqInfo, From}, PendingRequests),
976    State#state{pending_requests = PendingRequests2}.
977
978remove_pending_request(State, RequestId) ->
979    #state{
980       pending_requests = PendingRequests
981    } = State,
982    PendingRequests2 = dict:erase(RequestId, PendingRequests),
983    State#state{pending_requests = PendingRequests2}.
984
985
986-spec find_pending_request(#state{}, request_id()) -> nil | {term(), nil | {pid(), term()}}.
987find_pending_request(State, RequestId) ->
988    #state{
989       pending_requests = PendingRequests
990    } = State,
991    case dict:find(RequestId, PendingRequests) of
992    error ->
993        nil;
994    {ok, Pending} ->
995        Pending
996    end.
997
998-spec next_request_id(#state{}) -> #state{}.
999next_request_id(#state{request_id = RequestId} = State) ->
1000    RequestId2 = case RequestId of
1001    Id when Id + 1 < (1 bsl ?DCP_SIZES_OPAQUE) ->
1002        Id + 1;
1003    _ ->
1004        0
1005    end,
1006    State#state{request_id = RequestId2}.
1007
1008-spec remove_request_queue(#state{}, request_id()) -> #state{}.
1009remove_request_queue(State, RequestId) ->
1010    #state{
1011       active_streams = ActiveStreams,
1012       stream_queues = StreamQueues
1013    } = State,
1014    case lists:keyfind(RequestId, 2, ActiveStreams) of
1015    {_PartId, RequestId, MRef} ->
1016        erlang:demonitor(MRef, [flush]),
1017
1018        {ok, State1} =
1019            check_and_send_buffer_ack(State, RequestId, nil, remove_stream),
1020
1021        ActiveStreams2 = lists:keydelete(RequestId, 2, ActiveStreams),
1022
1023        % All active streams have finished reading
1024        % Let us ack for remaining unacked bytes
1025        case length(ActiveStreams2) of
1026        0 ->
1027            {ok, State2} = send_buffer_ack(State1);
1028        _ ->
1029            State2 = State
1030        end,
1031
1032        StreamQueues2 = dict:erase(RequestId, StreamQueues),
1033        State2#state{
1034            active_streams = ActiveStreams2,
1035            stream_queues = StreamQueues2
1036        };
1037    _ ->
1038        State
1039    end.
1040
1041
1042-spec add_request_queue(#state{}, partition_id(), request_id(), pid()) -> #state{}.
1043add_request_queue(State, PartId, RequestId, Pid) ->
1044    #state{
1045       active_streams = ActiveStreams,
1046       stream_queues = StreamQueues
1047    } = State,
1048
1049   MRef = erlang:monitor(process, Pid),
1050
1051   ActiveStreams2 =  [{PartId, RequestId, MRef} | ActiveStreams],
1052   StreamQueues2 = dict:store(RequestId, {nil, queue:new()}, StreamQueues),
1053   State#state{
1054       active_streams = ActiveStreams2,
1055       stream_queues = StreamQueues2
1056    }.
1057
1058
1059-spec enqueue_stream_event(#state{}, request_id(), tuple()) -> #state{}.
1060enqueue_stream_event(State, RequestId, Event) ->
1061    #state{
1062       stream_queues = StreamQueues
1063    } = State,
1064    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1065    State#state{
1066        stream_queues =
1067            dict:store(RequestId,
1068                       {Waiter, queue:in(Event, EvQueue)},
1069                       StreamQueues)
1070    }.
1071
1072-spec dequeue_stream_event(#state{}, request_id()) ->
1073                               {#state{}, tuple()}.
1074dequeue_stream_event(State, RequestId) ->
1075    #state{
1076       stream_queues = StreamQueues
1077    } = State,
1078    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1079    {{value, Event}, Rest} = queue:out(EvQueue),
1080    State2 = State#state{
1081        stream_queues =
1082            dict:store(RequestId, {Waiter, Rest}, StreamQueues)
1083    },
1084    {State2, Event}.
1085
1086-spec peek_stream_event(#state{}, request_id()) -> tuple().
1087peek_stream_event(State, RequestId) ->
1088    #state{
1089       stream_queues = StreamQueues
1090    } = State,
1091    {ok, {_, EvQueue}} = dict:find(RequestId, StreamQueues),
1092    {value, Event} = queue:peek(EvQueue),
1093    Event.
1094
1095-spec add_stream_event_waiter(#state{}, request_id(), term()) -> #state{} | nil.
1096add_stream_event_waiter(State, RequestId, NewWaiter) ->
1097    #state{
1098       stream_queues = StreamQueues
1099    } = State,
1100    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1101    case Waiter of
1102    nil ->
1103        StreamQueues2 = dict:store(RequestId, {NewWaiter, EvQueue}, StreamQueues),
1104        State#state{
1105           stream_queues = StreamQueues2
1106        };
1107    _ ->
1108        nil
1109    end.
1110
1111
1112-spec stream_event_present(#state{}, request_id()) -> nil | true | false.
1113stream_event_present(State, RequestId) ->
1114    #state{
1115       stream_queues = StreamQueues
1116    } = State,
1117    case dict:find(RequestId, StreamQueues) of
1118    error ->
1119        nil;
1120    {ok, {_, EvQueue}} ->
1121        queue:is_empty(EvQueue) =:= false
1122    end.
1123
1124
1125-spec stream_event_waiters_present(#state{}, request_id()) -> nil | true | false.
1126stream_event_waiters_present(State, RequestId) ->
1127    #state{
1128       stream_queues = StreamQueues
1129    } = State,
1130    case dict:find(RequestId, StreamQueues) of
1131    error ->
1132        nil;
1133    {ok, {Waiter, _}} ->
1134        Waiter =/= nil
1135    end.
1136
1137
1138-spec remove_stream_event_waiter(#state{}, request_id()) -> {#state{}, term()}.
1139remove_stream_event_waiter(State, RequestId) ->
1140    #state{
1141       stream_queues = StreamQueues
1142    } = State,
1143    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1144    State2 = State#state{
1145        stream_queues = dict:store(RequestId, {nil, EvQueue}, StreamQueues)
1146    },
1147    {State2, Waiter}.
1148
1149
1150-spec find_stream_req_id(#state{}, partition_id()) -> request_id() | nil.
1151find_stream_req_id(State, PartId) ->
1152    #state{
1153       active_streams = ActiveStreams
1154    } = State,
1155    case lists:keyfind(PartId, 1, ActiveStreams) of
1156    {PartId, StreamReqId, _} ->
1157        StreamReqId;
1158    false ->
1159        nil
1160    end.
1161
1162-spec parse_error_response(#bufsocket{}, timeout(), integer(), integer()) ->
1163                    {'error', atom() | {'status', integer()}} |
1164                    {'error', atom() | {'status', integer()}, #bufsocket{}}.
1165parse_error_response(BufSocket, Timeout, BodyLength, Status) ->
1166    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
1167    {ok, _, BufSocket2} ->
1168        {status_to_error(Status), BufSocket2};
1169    {error, _} = Error ->
1170        Error
1171    end.
1172
1173-spec status_to_error(integer()) -> {'error', atom() | {'status', integer()}}.
1174status_to_error(?DCP_STATUS_KEY_NOT_FOUND) ->
1175    {error, vbucket_stream_not_found};
1176status_to_error(?DCP_STATUS_ERANGE) ->
1177    {error, wrong_start_sequence_number};
1178status_to_error(?DCP_STATUS_KEY_EEXISTS) ->
1179    {error, vbucket_stream_already_exists};
1180status_to_error(?DCP_STATUS_NOT_MY_VBUCKET) ->
1181    {error, server_not_my_vbucket};
1182status_to_error(?DCP_STATUS_TMP_FAIL) ->
1183    {error, vbucket_stream_tmp_fail};
1184status_to_error(?DCP_STATUS_NOT_SUPPORTED) ->
1185    {error, not_supported};
1186status_to_error(Status) ->
1187    {error, {status, Status}}.
1188
1189
1190% The worker process for handling dcp connection downstream pipe
1191% Read and parse downstream messages and send to the gen_server process
1192-spec receive_worker(#bufsocket{}, timeout(), pid(), list()) ->
1193                                                    closed | inet:posix().
1194receive_worker(BufSocket, Timeout, Parent, MsgAcc0) ->
1195    case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, infinity) of
1196    {ok, Header, BufSocket2} ->
1197        {Action, MsgAcc, BufSocket3} =
1198        case couch_dcp_consumer:parse_header(Header) of
1199        {control_request, Status, RequestId} ->
1200            {done, {stream_response, RequestId, {RequestId, Status}},
1201                BufSocket2};
1202        {noop_request, RequestId} ->
1203            {done, {stream_noop, RequestId}, BufSocket2};
1204        {buffer_ack, ?DCP_STATUS_OK, _RequestId} ->
1205            {true, [], BufSocket2};
1206        {stream_request, Status, RequestId, BodyLength} ->
1207            {Response, BufSocket5} = case Status of
1208            ?DCP_STATUS_OK ->
1209                case receive_failover_log(
1210                    BufSocket2, Timeout, Status, BodyLength) of
1211                {{ok, FailoverLog}, BufSocket4} ->
1212                    {{failoverlog, FailoverLog}, BufSocket4};
1213                Error ->
1214                    {Error, BufSocket2}
1215                end;
1216            ?DCP_STATUS_ROLLBACK ->
1217                case receive_rollback_seq(
1218                    BufSocket2, Timeout, BodyLength) of
1219                {ok, RollbackSeq, BufSocket4} ->
1220                    {{rollback, RollbackSeq}, BufSocket4};
1221                Error ->
1222                    {Error, BufSocket2}
1223                end;
1224            _ ->
1225                parse_error_response(BufSocket2, Timeout, BodyLength, Status)
1226            end,
1227            {done, {stream_response, RequestId, {RequestId, Response}},
1228                BufSocket5};
1229        {failover_log, Status, RequestId, BodyLength} ->
1230            {Response, BufSocket5} = receive_failover_log(
1231                BufSocket2, Timeout, Status, BodyLength),
1232            {done, {stream_response, RequestId, Response}, BufSocket5};
1233        {stream_close, Status, RequestId, BodyLength} ->
1234            {Response, BufSocket5} = case Status of
1235            ?DCP_STATUS_OK ->
1236                {ok, BufSocket2};
1237            _ ->
1238                parse_error_response(BufSocket2, Timeout, BodyLength, Status)
1239            end,
1240            {done, {stream_response, RequestId, Response}, BufSocket5};
1241        {stats, Status, RequestId, BodyLength, KeyLength} ->
1242            case BodyLength of
1243            0 ->
1244                case Status of
1245                ?DCP_STATUS_OK ->
1246                    StatAcc = lists:reverse(MsgAcc0),
1247                    {done, {stream_response, RequestId, {ok, StatAcc}},
1248                        BufSocket2};
1249                % Some errors might not contain a body
1250                _ ->
1251                    Error = {error, {Status, <<>>}},
1252                    {done, {stream_response, RequestId, Error}, BufSocket2}
1253                end;
1254            _ ->
1255                case receive_stat(
1256                    BufSocket2, Timeout, Status, BodyLength, KeyLength) of
1257                {{ok, Stat}, BufSocket5} ->
1258                    {true, [Stat | MsgAcc0], BufSocket5};
1259                {error, _} = Error ->
1260                    {done, {stream_response, RequestId, Error}, BufSocket2}
1261                end
1262            end;
1263        {snapshot_marker, _PartId, RequestId, BodyLength} ->
1264            {ok, SnapshotMarker, BufSocket5} = receive_snapshot_marker(
1265                BufSocket2, Timeout, BodyLength),
1266            {done, {stream_event, RequestId,
1267                {snapshot_marker, SnapshotMarker, BodyLength}}, BufSocket5};
1268        {snapshot_mutation, PartId, RequestId, KeyLength, BodyLength,
1269                ExtraLength, Cas, DataType} ->
1270            {Mutation, BufSocket5} = receive_snapshot_mutation(
1271                BufSocket2, Timeout, PartId, KeyLength, BodyLength, ExtraLength,
1272                Cas, DataType),
1273            {done, {stream_event, RequestId,
1274                    {snapshot_mutation, Mutation, BodyLength}}, BufSocket5};
1275        % For the indexer and XDCR there's no difference between a deletion
1276        % end an expiration. In both cases the items should get removed.
1277        % Hence the same code can be used after the initial header
1278        % parsing (the body is the same).
1279        {OpCode, PartId, RequestId, KeyLength, BodyLength, Cas, DataType} when
1280                OpCode =:= snapshot_deletion orelse
1281                OpCode =:= snapshot_expiration ->
1282            {Deletion, BufSocket5} = receive_snapshot_deletion(
1283                BufSocket2, Timeout, PartId, KeyLength, BodyLength,
1284                Cas, DataType),
1285            {done, {stream_event, RequestId,
1286                {snapshot_deletion, Deletion, BodyLength}}, BufSocket5};
1287        {stream_end, PartId, RequestId, BodyLength} ->
1288            {Flag, BufSocket5} = receive_stream_end(BufSocket2,
1289                Timeout, BodyLength),
1290            {done, {stream_event, RequestId, {stream_end,
1291                {RequestId, PartId, Flag}, BodyLength}}, BufSocket5};
1292        {all_seqs, Status, RequestId, BodyLength} ->
1293            {Resp, BufSocket5} = receive_all_seqs(
1294                BufSocket2, Timeout, Status, BodyLength),
1295            {done, {stream_response, RequestId, Resp}, BufSocket5}
1296        end,
1297        case Action of
1298        done ->
1299            Parent ! MsgAcc,
1300            receive_worker(BufSocket3, Timeout, Parent, []);
1301        true ->
1302            receive_worker(BufSocket3, Timeout, Parent, MsgAcc)
1303        end;
1304    {error, Reason} ->
1305        exit({conn_error, Reason})
1306    end.
1307
1308% Check if we need to send buffer ack to server and send it
1309% if required.
1310-spec check_and_send_buffer_ack(#state{}, request_id(), tuple() | nil, atom()) ->
1311                        {ok, #state{}} | {error, closed | inet:posix()}.
1312check_and_send_buffer_ack(State, _RequestId, {error, _, _}, _Type) ->
1313    {ok, State};
1314
1315check_and_send_buffer_ack(State, RequestId, Event, Type) ->
1316    #state{
1317        bufsocket = BufSocket,
1318        max_buffer_size = MaxBufSize,
1319        stream_queues = StreamQueues,
1320        total_buffer_size = Size
1321    } = State,
1322    Size2 = case Type of
1323    remove_stream ->
1324        case dict:find(RequestId, StreamQueues) of
1325        error ->
1326            Size;
1327        {ok, {_, EvQueue}} ->
1328            get_queue_size(EvQueue, Size)
1329        end;
1330    mutation ->
1331        Size + get_event_size(Event)
1332    end,
1333    MaxAckSize = MaxBufSize * ?DCP_BUFFER_ACK_THRESHOLD,
1334    {Status, Ret} = if
1335    Size2 > MaxAckSize ->
1336        BufferAckRequest = couch_dcp_consumer:encode_buffer_request(0, Size2),
1337        case bufsocket_send(BufSocket, BufferAckRequest) of
1338        ok ->
1339            {ok, 0};
1340        Error ->
1341            {error, Error}
1342        end;
1343    Size2 == 0 ->
1344        {false, stream_not_found};
1345    true ->
1346        {ok, Size2}
1347    end,
1348    case Status of
1349    ok ->
1350        State2 = State#state{
1351            total_buffer_size = Ret
1352        },
1353        {ok, State2};
1354    error ->
1355        {error, Ret};
1356    false ->
1357        {ok, State}
1358    end.
1359
1360-spec send_buffer_ack(#state{}) ->
1361            {ok, #state{}} | {error, closed | inet:posix()}.
1362send_buffer_ack(State) ->
1363    #state{
1364        bufsocket = BufSocket,
1365        total_buffer_size = Size
1366    } = State,
1367    BufferAckRequest = couch_dcp_consumer:encode_buffer_request(0, Size),
1368    case bufsocket_send(BufSocket, BufferAckRequest) of
1369    ok ->
1370        {ok, State#state{total_buffer_size = 0}};
1371    {error, _Reason} = Error ->
1372        Error
1373    end.
1374
1375
1376-spec set_buffer_size(#state{}, non_neg_integer()) -> {ok ,#state{}} |
1377                                            {error, closed | inet:posix()}.
1378set_buffer_size(State, Size) ->
1379    #state{
1380        bufsocket = BufSocket,
1381        request_id = RequestId
1382    } = State,
1383    ControlRequest = couch_dcp_consumer:encode_control_request(RequestId, connection, Size),
1384    case bufsocket_send(BufSocket, ControlRequest) of
1385    ok ->
1386        State2 = next_request_id(State),
1387        State3 = add_pending_request(State2, RequestId, {control_request, Size}, nil),
1388        State4 = State3#state{max_buffer_size = Size},
1389        {ok, State4};
1390    {error, Error} ->
1391        {error, Error}
1392    end.
1393
1394-spec get_queue_size(queue(), non_neg_integer()) -> non_neg_integer().
1395get_queue_size(EvQueue, Size) ->
1396    case queue:out(EvQueue) of
1397    {empty, _} ->
1398        Size;
1399    {{value, Item}, NewQueue} ->
1400        Size2 = Size + get_event_size(Item),
1401        get_queue_size(NewQueue, Size2)
1402    end.
1403
1404-spec get_event_size({atom(), #dcp_doc{}, non_neg_integer()}) -> non_neg_integer().
1405get_event_size({_Type, _Doc, BodyLength}) ->
1406    ?DCP_HEADER_LEN + BodyLength.
1407
1408-spec remove_stream_info(partition_id(), #state{}) -> #state{}.
1409remove_stream_info(PartId, State) ->
1410    #state{
1411        active_streams = ActiveStreams
1412    } = State,
1413    case lists:keyfind(PartId, 1, ActiveStreams) of
1414    false ->
1415        State;
1416    {_, RequestId, _} ->
1417        remove_stream_info_by_reqid(RequestId, State)
1418    end.
1419
1420-spec remove_stream_info_by_reqid(request_id(), #state{}) -> #state{}.
1421remove_stream_info_by_reqid(RequestId,
1422                            #state{stream_info = StreamData} = State) ->
1423    StreamData2 = dict:erase(RequestId, StreamData),
1424    State#state{stream_info = StreamData2}.
1425
1426-spec insert_stream_info(partition_id(), request_id(), uuid(), non_neg_integer(),
1427                        non_neg_integer(), #state{}, non_neg_integer()) -> #state{}.
1428insert_stream_info(PartId, RequestId, PartUuid, StartSeq, EndSeq, State, Flags) ->
1429    #state{stream_info = StreamData} = State,
1430    Data = #stream_info{
1431        part_id = PartId,
1432        part_uuid = PartUuid,
1433        start_seq = StartSeq,
1434        end_seq = EndSeq,
1435        flags = Flags
1436    },
1437    StreamData2 = dict:store(RequestId, Data, StreamData),
1438    State#state{stream_info = StreamData2}.
1439
1440-spec find_stream_info(request_id(), #state{}) -> #stream_info{} | nil.
1441find_stream_info(RequestId, State) ->
1442    #state{
1443       stream_info = StreamData
1444    } = State,
1445    case dict:find(RequestId, StreamData) of
1446    {ok, Info} ->
1447        Info;
1448    error ->
1449        nil
1450    end.
1451
1452-spec add_new_stream({partition_id(), uuid(), update_seq(), update_seq(),
1453        {update_seq(), update_seq()}, 0..255},
1454        {pid(), string()}, #state{}) -> #state{} | {error, closed | inet:posix()}.
1455add_new_stream({PartId, PartUuid, StartSeq, EndSeq,
1456    {SnapSeqStart, SnapSeqEnd}, Flags}, From, State) ->
1457   #state{
1458       bufsocket = BufSocket,
1459       request_id = RequestId
1460    } = State,
1461    StreamRequest = couch_dcp_consumer:encode_stream_request(
1462        PartId, RequestId, Flags, StartSeq, EndSeq, PartUuid, SnapSeqStart, SnapSeqEnd),
1463    case bufsocket_send(BufSocket, StreamRequest) of
1464    ok ->
1465        State2 = insert_stream_info(PartId, RequestId, PartUuid, StartSeq, EndSeq,
1466            State, Flags),
1467        State3 = next_request_id(State2),
1468        add_pending_request(State3, RequestId, {add_stream, PartId}, From);
1469    Error ->
1470        {error, Error}
1471    end.
1472
1473-spec restart_worker(#state{}) -> {noreply, #state{}} | {stop, sasl_auth_failed}.
1474restart_worker(State) ->
1475    #state{
1476        args = Args
1477    } = State,
1478    case init(Args) of
1479    {stop, Reason} ->
1480        {stop, Reason, State};
1481    {ok, State2} ->
1482        #state{
1483            bufsocket = BufSocket,
1484            worker_pid = WorkerPid
1485        } = State2,
1486        % Replace the socket
1487        State3 = State#state{
1488            bufsocket = BufSocket,
1489            pending_requests = dict:new(),
1490            worker_pid = WorkerPid
1491        },
1492        Error = {error, dcp_conn_closed},
1493        dict:map(fun(_RequestId, {ReqInfo, SendTo}) ->
1494            case ReqInfo of
1495            get_stats ->
1496                {MRef, From} = SendTo,
1497                From ! {get_stats, MRef, Error};
1498	    all_seqs ->
1499                {MRef, From} = SendTo,
1500                From ! {all_seqs, MRef, Error};
1501            {control_request, _} ->
1502                ok;
1503            _ ->
1504                gen_server:reply(SendTo, Error)
1505            end
1506        end, State#state.pending_requests),
1507        dict:map(fun(RequestId, _Value) ->
1508            Event = {error, dcp_conn_closed, 0},
1509            self() ! {stream_event, RequestId, Event}
1510        end, State#state.stream_info),
1511        {noreply, State3}
1512    end.
1513
1514-spec store_snapshot_seq(request_id(), {update_seq(), update_seq(),
1515                            non_neg_integer()}, #state{}) -> #state{}.
1516store_snapshot_seq(RequestId, Data, State) ->
1517    {StartSeq, EndSeq, _Type} = Data,
1518    #state{
1519        stream_info = StreamData
1520    } = State,
1521    case dict:find(RequestId, StreamData) of
1522    {ok, Val} ->
1523        Val2 = Val#stream_info{snapshot_seq = {StartSeq, EndSeq}},
1524        StreamData2 = dict:store(RequestId, Val2, StreamData),
1525        State#state{stream_info = StreamData2};
1526    error ->
1527        State
1528    end.
1529
1530-spec store_snapshot_mutation(request_id(), #dcp_doc{}, #state{}) -> #state{}.
1531store_snapshot_mutation(RequestId, Data, State) ->
1532  #dcp_doc{
1533        seq = Seq
1534    } = Data,
1535    #state{
1536        stream_info = StreamData
1537    } = State,
1538    case dict:find(RequestId, StreamData) of
1539    error ->
1540        State;
1541    {ok, Val} ->
1542        Val2 = Val#stream_info{start_seq = Seq},
1543        StreamData2 = dict:store(RequestId, Val2, StreamData),
1544        State#state{stream_info = StreamData2}
1545    end.
1546
1547-spec remove_body_len({atom(), tuple | #dcp_doc{}, non_neg_integer()}) ->
1548                                                {atom(), tuple | #dcp_doc{}}.
1549remove_body_len({Type, Data, _BodyLength}) ->
1550    {Type, Data}.
1551