1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_dcp_client).
14-behaviour(gen_server).
15
16% Public API
17-export([start/5]).
18-export([add_stream/6, get_seqs/2, get_num_items/2,
19    get_failover_log/2]).
20-export([get_stream_event/2, remove_stream/2, list_streams/1]).
21-export([enum_docs_since/8, restart_worker/1]).
22-export([get_seqs_async/1]).
23
24% gen_server callbacks
25-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
26-export([format_status/2]).
27
28-include("couch_db.hrl").
29-include_lib("couch_dcp/include/couch_dcp.hrl").
30-include_lib("couch_dcp/include/couch_dcp_typespecs.hrl").
31-define(MAX_BUF_SIZE, 10485760).
32-define(TIMEOUT, 60000).
33-define(TIMEOUT_STATS, 2000).
34-define(DCP_RETRY_TIMEOUT, 2000).
35-define(DCP_BUF_WINDOW, 65535).
36-define(SOCKET(BufSocket), BufSocket#bufsocket.sockpid).
37
38-type mutations_fold_fun() :: fun().
39-type mutations_fold_acc() :: any().
40
41-record(bufsocket, {
42    sockpid = nil                   :: socket() | nil,
43    sockbuf = <<>>                  :: binary()
44}).
45
46-record(state, {
47    bufsocket = nil                 :: #bufsocket{} | nil,
48    timeout = 5000                  :: timeout(),
49    request_id = 0                  :: request_id(),
50    pending_requests = dict:new()   :: dict(),
51    stream_queues = dict:new()      :: dict(),
52    active_streams = []             :: list(),
53    worker_pid                      :: pid(),
54    max_buffer_size = ?MAX_BUF_SIZE :: integer(),
55    total_buffer_size = 0           :: non_neg_integer(),
56    stream_info = dict:new()        :: dict(),
57    args = []                       :: list()
58}).
59
60-record(stream_info, {
61    part_id = 0                     :: partition_id(),
62    part_uuid = 0                   :: uuid(),
63    start_seq = 0                   :: update_seq(),
64    end_seq = 0                     :: update_seq(),
65    snapshot_seq = {0, 0}           :: {update_seq(), update_seq()},
66    flags = 0                       :: non_neg_integer()
67}).
68
69% This gen server implements a DCP client with vbucket stream multiplexing
70% The client spawns a worker process to handle all response messages and event
71% messages received from the DCP server. For easiness, responses are classifed into
72% two types of messages, stream_response and stream_event. For any DCP request, the
73% corresponding response message is called stream_response. But a vbucket stream request
74% is like a subscribe mechanism. After the stream initated response arrives, it will start
75% sending vbucket events. This type of messages are called as stream_event.
76%
77% The user can request for a stream using add_stream() API and corresponding events can
78% be received by using get_stream_event() API.
79
80
81% Public API
82
83-spec start(binary(), binary(), binary(), binary(), non_neg_integer()) -> {ok, pid()} | ignore |
84                                   {error, {already_started, pid()} | term()}.
85start(Name, Bucket, AdmUser, AdmPasswd, BufferSize) ->
86    gen_server:start_link(?MODULE, [Name, Bucket, AdmUser, AdmPasswd, BufferSize], []).
87
88-spec add_stream(pid(), partition_id(), uuid(), update_seq(),
89    update_seq(), dcp_data_type()) -> {error, term()} | {request_id(), term()}.
90add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
91    gen_server:call(
92        Pid, {add_stream, PartId, PartUuid, StartSeq, EndSeq, Flags}, ?TIMEOUT).
93
94
95-spec remove_stream(pid(), partition_id()) ->
96                            'ok' | {'error', term()}.
97remove_stream(Pid, PartId) ->
98    gen_server:call(Pid, {remove_stream, PartId}, ?TIMEOUT).
99
100
101-spec list_streams(pid()) -> list().
102list_streams(Pid) ->
103    gen_server:call(Pid, list_streams).
104
105
106-spec get_stats_reply(pid(), reference()) -> term().
107get_stats_reply(Pid, MRef) ->
108    receive
109    {get_stats, MRef, Reply} ->
110        Reply;
111    {'DOWN', MRef, process, Pid, Reason} ->
112        exit({dcp_client_died, Pid, Reason})
113    after ?TIMEOUT_STATS ->
114        ?LOG_ERROR("dcp client (~p): stats timed out after ~p seconds."
115                   " Waiting...",
116            [Pid, ?TIMEOUT_STATS / 1000]),
117        get_stats_reply(Pid, MRef)
118    end.
119
120-spec get_all_seqs_reply(pid(), reference()) -> term().
121get_all_seqs_reply(Pid, MRef) ->
122    receive
123    {all_seqs, MRef, Reply} ->
124        Reply;
125    {'DOWN', MRef, process, Pid, Reason} ->
126        exit({dcp_client_died, Pid, Reason})
127    after ?TIMEOUT_STATS ->
128        ?LOG_ERROR("dcp client (~p): dcp all seqs timed out after ~p seconds."
129                   " Waiting...",
130            [Pid, ?TIMEOUT_STATS / 1000]),
131        get_all_seqs_reply(Pid, MRef)
132    end.
133
134-spec get_stats(pid(), binary(), partition_id() | nil) -> term().
135get_stats(Pid, Name, PartId) ->
136    MRef = erlang:monitor(process, Pid),
137    Pid ! {get_stats, Name, PartId, {MRef, self()}},
138    Reply = get_stats_reply(Pid, MRef),
139    erlang:demonitor(MRef, [flush]),
140    Reply.
141
142
143% The async get_seqs API requests seq numbers asynchronously.
144% The response will be sent to the caller process asynchronously in the
145% following message format:
146% {all_seqs, nil, StatsResponse}.
147-spec get_seqs_async(pid()) -> ok.
148get_seqs_async(Pid) ->
149    Pid ! {get_all_seqs, {nil, self()}},
150    ok.
151
152-spec get_seqs(pid(), ordsets:ordset(partition_id()) | nil) ->
153         {ok, partition_seqs()} | {error, term()}.
154get_seqs(Pid, SortedPartIds) ->
155    case get_all_seqs(Pid) of
156    {ok, Seqs} ->
157        case SortedPartIds of
158        nil ->
159            {ok, Seqs};
160        _ ->
161            Seqs2 = couch_set_view_util:filter_seqs(SortedPartIds, Seqs),
162            {ok, Seqs2}
163        end;
164    {error, _} = Error ->
165        Error
166    end.
167
168-spec get_all_seqs(pid()) -> term().
169get_all_seqs(Pid) ->
170    MRef = erlang:monitor(process, Pid),
171    Pid ! {get_all_seqs, {MRef, self()}},
172    Reply = get_all_seqs_reply(Pid, MRef),
173    erlang:demonitor(MRef, [flush]),
174    Reply.
175
176-spec get_num_items(pid(), partition_id()) ->
177                           {ok, non_neg_integer()} | {error, not_my_vbucket}.
178get_num_items(Pid, PartId) ->
179    Reply = get_stats(Pid, <<"vbucket-details">>, PartId),
180    case Reply of
181    {ok, Stats} ->
182        BinPartId = list_to_binary(integer_to_list(PartId)),
183        {_, NumItems} = lists:keyfind(
184            <<"vb_", BinPartId/binary, ":num_items">>, 1, Stats),
185        {ok, list_to_integer(binary_to_list(NumItems))};
186    {error, {?DCP_STATUS_NOT_MY_VBUCKET, _}} ->
187        {error, not_my_vbucket}
188    end.
189
190
191-spec get_failover_log(pid(), partition_id()) ->
192                              {error, no_failover_log_found | dcp_status()} |
193                              {ok, partition_version()}.
194get_failover_log(Pid, PartId) ->
195    gen_server:call(Pid, {get_failover_log, PartId}).
196
197
198-spec get_stream_event(pid(), request_id()) ->
199                              {atom(), #dcp_doc{}} | {'error', term()}.
200get_stream_event(Pid, ReqId) ->
201    MRef = erlang:monitor(process, Pid),
202    Pid ! {get_stream_event, ReqId, self()},
203    Reply = get_stream_event_get_reply(Pid, ReqId, MRef),
204    erlang:demonitor(MRef, [flush]),
205    Reply.
206
207get_stream_event_get_reply(Pid, ReqId, MRef) ->
208    receive
209    {stream_event, ReqId, Reply} ->
210        Reply;
211    {'DOWN', MRef, process, Pid, Reason} ->
212        exit({dcp_client_died, Pid, Reason})
213    after ?TIMEOUT ->
214        Msg = {print_log, ReqId},
215        Pid ! Msg,
216        get_stream_event_get_reply(Pid, ReqId, MRef)
217    end.
218
219-spec enum_docs_since(pid(), partition_id(), partition_version(), update_seq(),
220                      update_seq(), 0..255, mutations_fold_fun(),
221                      mutations_fold_acc()) ->
222                             {error, vbucket_stream_not_found |
223                              wrong_start_sequence_number |
224                              too_large_failover_log } |
225                             {rollback, update_seq()} |
226                             {ok, mutations_fold_acc(), partition_version()}.
227enum_docs_since(_, _, [], _, _, _, _, _) ->
228    % No matching partition version found. Recreate the index from scratch
229    {rollback, 0};
230enum_docs_since(Pid, PartId, PartVersions, StartSeq, EndSeq0, Flags,
231        CallbackFn, InAcc) ->
232    [PartVersion | PartVersionsRest] = PartVersions,
233    {PartUuid, _} = PartVersion,
234    EndSeq = case EndSeq0 < StartSeq of
235    true ->
236        ?LOG_INFO("dcp client (~p): Expecting a rollback for partition ~p. "
237        "Found start_seqno > end_seqno (~p > ~p).",
238        [Pid, PartId, StartSeq, EndSeq0]),
239        StartSeq;
240    false ->
241        EndSeq0
242    end,
243
244    case add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) of
245    {error, _} = Error ->
246        Error;
247    {RequestId, Resp} ->
248        case Resp of
249        {failoverlog, FailoverLog} ->
250            case length(FailoverLog) > ?DCP_MAX_FAILOVER_LOG_SIZE of
251            true ->
252                {error, too_large_failover_log};
253            false ->
254                InAcc2 = CallbackFn({part_versions, {PartId, FailoverLog}}, InAcc),
255                case receive_events(Pid, RequestId, CallbackFn, InAcc2) of
256                {ok, InAcc3} ->
257                    {ok, InAcc3, FailoverLog};
258                Error ->
259                    Error
260                end
261            end;
262        {error, vbucket_stream_not_found} ->
263            enum_docs_since(Pid, PartId, PartVersionsRest, StartSeq, EndSeq,
264                Flags, CallbackFn, InAcc);
265        {error, vbucket_stream_tmp_fail} ->
266            ?LOG_INFO("dcp client (~p): Temporary failure on stream request "
267                "on partition ~p. Retrying...", [Pid, PartId]),
268            timer:sleep(100),
269            enum_docs_since(Pid, PartId, PartVersions, StartSeq, EndSeq,
270                Flags, CallbackFn, InAcc);
271        _ ->
272            Resp
273        end
274    end.
275
276
277% gen_server callbacks
278
279-spec init([binary() | non_neg_integer()]) -> {ok, #state{}} |
280                    {stop, sasl_auth_failed | closed | inet:posix()}.
281init([Name, Bucket, AdmUser, AdmPasswd, BufferSize]) ->
282    DcpTimeout = list_to_integer(
283        couch_config:get("dcp", "connection_timeout")),
284    DcpPort = list_to_integer(couch_config:get("dcp", "port")),
285
286    case gen_tcp:connect("localhost", DcpPort,
287        [binary, {packet, raw}, {active, true}, {nodelay, true},
288                 {buffer, ?DCP_BUF_WINDOW}], DcpTimeout) of
289    {ok, SockPid} ->
290        BufSocket = #bufsocket{sockpid = SockPid, sockbuf = <<>>},
291        State = #state{
292            bufsocket = BufSocket,
293            timeout = DcpTimeout,
294            request_id = 0
295        },
296        % Auth as admin and select bucket for the connection
297        case sasl_auth(AdmUser, AdmPasswd, State) of
298        {ok, State2} ->
299            case select_bucket(Bucket, State2) of
300            {ok, State3} ->
301                % Store the meta information to reconnect
302                Args = [Name, Bucket, AdmUser, AdmPasswd, BufferSize],
303                State4 = State3#state{args = Args},
304                case open_connection(Name, State4) of
305                {ok, State5} ->
306                    Parent = self(),
307                    process_flag(trap_exit, true),
308                    #state{bufsocket = BufSocket2} = State5,
309                    WorkerPid = spawn_link(
310                        fun() ->
311                            receive_worker(BufSocket2, DcpTimeout, Parent, [])
312                        end),
313                    gen_tcp:controlling_process(?SOCKET(BufSocket), WorkerPid),
314                    case set_buffer_size(State5, BufferSize) of
315                    {ok, State6} ->
316                        {ok, State6#state{worker_pid = WorkerPid}};
317                    {error, Reason} ->
318                        exit(WorkerPid, shutdown),
319                        {stop, Reason}
320                    end;
321                {error, Reason} ->
322                    {stop, Reason}
323                end;
324            {error, Reason} ->
325                {stop, Reason}
326            end;
327        {error, Reason} ->
328            {stop, Reason}
329        end;
330    {error, Reason} ->
331        {stop, {error, {dcp_socket_connect_failed, Reason}}}
332    end.
333
334
335% Add caller to the request queue and wait for gen_server to reply on response arrival
336handle_call({add_stream, PartId, PartUuid, StartSeq, EndSeq, Flags},
337        From, State) ->
338    SnapshotStart = StartSeq,
339    SnapshotEnd = StartSeq,
340    case add_new_stream({PartId, PartUuid, StartSeq, EndSeq,
341        {SnapshotStart, SnapshotEnd}, Flags}, From, State) of
342    {error, Reason} ->
343        {reply, {error, Reason}, State};
344    State2 ->
345        {noreply, State2}
346    end;
347
348handle_call({remove_stream, PartId}, From, State) ->
349    State2 = remove_stream_info(PartId, State),
350    #state{
351       request_id = RequestId,
352       bufsocket = BufSocket
353    } = State2,
354    StreamCloseRequest = couch_dcp_consumer:encode_stream_close(
355        PartId, RequestId),
356    case bufsocket_send(BufSocket, StreamCloseRequest) of
357    ok ->
358        State3 = next_request_id(State2),
359        State4 = add_pending_request(State3, RequestId, {remove_stream, PartId}, From),
360        {noreply, State4};
361    {error, _Reason} = Error ->
362        {reply, Error, State2}
363    end;
364
365handle_call(list_streams, _From, State) ->
366    #state{
367       active_streams = ActiveStreams
368    } = State,
369    Reply = lists:foldl(fun({PartId, _}, Acc) -> [PartId | Acc] end, [], ActiveStreams),
370    {reply, Reply, State};
371
372handle_call({get_failover_log, PartId}, From, State) ->
373    #state{
374       request_id = RequestId,
375       bufsocket = BufSocket
376    } = State,
377    FailoverLogRequest = couch_dcp_consumer:encode_failover_log_request(
378        PartId, RequestId),
379    case bufsocket_send(BufSocket, FailoverLogRequest) of
380    ok ->
381        State2 = next_request_id(State),
382        State3 = add_pending_request(State2, RequestId, get_failover_log, From),
383        {noreply, State3};
384    Error ->
385        {reply, Error, State}
386    end;
387
388% Only used by unit test
389handle_call(get_buffer_size, _From, #state{total_buffer_size = Size} = State) ->
390    {reply, Size, State};
391
392% Only used by unit test
393handle_call(reset_buffer_size, _From, #state{total_buffer_size = Size} = State) ->
394    State2 = State#state{total_buffer_size = 0},
395    {reply, Size, State2};
396
397% Only used by unit test
398handle_call(flush_old_streams_meta, _From, State) ->
399    {reply, ok, State#state{stream_info = dict:new()}};
400
401% Only used by unit test
402handle_call(get_socket, _From, #state{bufsocket = BufSocket} = State) ->
403    {reply, BufSocket, State};
404
405% Only used by unit test
406handle_call({get_buffer_size, RequestId}, _From,
407        #state{stream_queues = StreamQueues} = State) ->
408    {ok, {_, EvQueue}} = dict:find(RequestId, StreamQueues),
409    Size = get_queue_size(EvQueue, 0),
410    {reply, Size, State};
411
412% Only used by unit test
413handle_call({get_event_size, Event}, _From, State) ->
414    Size = get_event_size(Event),
415    {reply, Size, State}.
416
417% If a stream event for this requestId is present in the queue,
418% dequeue it and reply back to the caller.
419% Else, put the caller into the stream queue waiter list
420handle_info({get_stream_event, RequestId, From}, State) ->
421    case stream_event_present(State, RequestId) of
422    true ->
423        Event = peek_stream_event(State, RequestId),
424        {Optype, _, _} = Event,
425        {Msg, State6} = case check_and_send_buffer_ack(State, RequestId, Event, mutation) of
426        {ok, State3} ->
427            {State4, Event} = dequeue_stream_event(State3, RequestId),
428            State5 = case Optype =:= stream_end orelse Optype =:= error of
429            true ->
430                remove_request_queue(State4, RequestId);
431            _ ->
432                State4
433            end,
434            {remove_body_len(Event), State5};
435        {error, Reason} ->
436            {{error, Reason}, State}
437        end,
438        From ! {stream_event, RequestId, Msg},
439        {noreply, State6};
440    false ->
441        case add_stream_event_waiter(State, RequestId, From) of
442        nil ->
443            Reply = {error, event_request_already_exists},
444            From ! {stream_event, RequestId, Reply},
445            {noreply, State};
446        State2 ->
447            {noreply, State2}
448        end;
449    nil ->
450        Reply = {error, vbucket_stream_not_found},
451        From ! {stream_event, RequestId, Reply},
452        {noreply, State}
453    end;
454
455
456handle_info({get_stats, Stat, PartId, From}, State) ->
457    #state{
458       request_id = RequestId,
459       bufsocket = BufSocket
460    } = State,
461    SeqStatRequest = couch_dcp_consumer:encode_stat_request(
462        Stat, PartId, RequestId),
463    case bufsocket_send(BufSocket, SeqStatRequest) of
464    ok ->
465        State2 = next_request_id(State),
466        State3 = add_pending_request(State2, RequestId, get_stats, From),
467        {noreply, State3};
468    Error ->
469        {reply, Error, State}
470    end;
471
472handle_info({get_all_seqs, From}, State) ->
473    #state{
474       request_id = RequestId,
475       bufsocket = BufSocket
476    } = State,
477    SeqStatRequest = couch_dcp_consumer:encode_all_seqs_request(RequestId),
478    case bufsocket_send(BufSocket, SeqStatRequest) of
479    ok ->
480        State2 = next_request_id(State),
481        State3 = add_pending_request(State2, RequestId, all_seqs, From),
482        {noreply, State3};
483    Error ->
484        {reply, Error, State}
485    end;
486
487% Handle response message send by connection receiver worker
488% Reply back to waiting callers
489handle_info({stream_response, RequestId, Msg}, State) ->
490    State3 = case find_pending_request(State, RequestId) of
491    {_, nil} ->
492        State;
493    {ReqInfo, SendTo} ->
494        State2 = case ReqInfo of
495        {add_stream, PartId} ->
496            gen_server:reply(SendTo, Msg),
497            case Msg of
498            {_, {failoverlog, _}} ->
499                add_request_queue(State, PartId, RequestId);
500            _ ->
501                State
502            end;
503        {remove_stream, PartId} ->
504            gen_server:reply(SendTo, Msg),
505            StreamReqId = find_stream_req_id(State, PartId),
506            case check_and_send_buffer_ack(State, StreamReqId, nil, remove_stream) of
507            {ok, NewState} ->
508                case Msg of
509                ok ->
510                    remove_request_queue(NewState, StreamReqId);
511                {error, vbucket_stream_not_found} ->
512                    remove_request_queue(NewState, StreamReqId);
513                _ ->
514                    NewState
515                end;
516            {error, Error} ->
517                throw({control_ack_failed, Error}),
518                State
519            end;
520        % Server sent the response for the internal control request
521        {control_request, Size} ->
522            State#state{max_buffer_size = Size};
523        get_stats ->
524            {MRef, From} = SendTo,
525            From ! {get_stats, MRef, Msg},
526            State;
527        all_seqs ->
528            {MRef, From} = SendTo,
529            From ! {all_seqs, MRef, Msg},
530            State;
531        _ ->
532            gen_server:reply(SendTo, Msg),
533            State
534        end,
535        remove_pending_request(State2, RequestId);
536    nil ->
537        State
538    end,
539    {noreply, State3};
540
541% Respond with the no op message reply to server
542handle_info({stream_noop, RequestId}, State) ->
543    #state {
544        bufsocket = BufSocket
545    } = State,
546    NoOpResponse = couch_dcp_consumer:encode_noop_response(RequestId),
547    % if noop reponse fails two times, server it self will close the connection
548    bufsocket_send(BufSocket, NoOpResponse),
549    {noreply, State};
550
551% Handle events send by connection receiver worker
552% If there is a waiting caller for stream event, reply to them
553% Else, queue the event into the stream queue
554handle_info({stream_event, RequestId, Event}, State) ->
555    {Optype, Data, _Length} = Event,
556    State2 = case Optype of
557    Optype when Optype =:= stream_end orelse Optype =:= error ->
558        #state{
559            stream_info = StreamData
560        } = State,
561        StreamData2 = dict:erase(RequestId, StreamData),
562        State#state{stream_info = StreamData2};
563    snapshot_marker ->
564        store_snapshot_seq(RequestId, Data, State);
565    snapshot_mutation ->
566        store_snapshot_mutation(RequestId, Data, State);
567    snapshot_deletion ->
568        store_snapshot_mutation(RequestId, Data, State)
569    end,
570    case stream_event_waiters_present(State2, RequestId) of
571    true ->
572        {State3, Waiter} = remove_stream_event_waiter(State2, RequestId),
573        {Msg, State6} = case check_and_send_buffer_ack(State3, RequestId, Event, mutation) of
574        {ok, State4} ->
575            State5 = case Optype =:= stream_end orelse Optype =:= error of
576            true ->
577                remove_request_queue(State4, RequestId);
578            _ ->
579                State4
580            end,
581            {remove_body_len(Event), State5};
582        {error, Reason} ->
583            State4 = enqueue_stream_event(State3, RequestId, Event),
584            {{error, Reason}, State4}
585        end,
586        Waiter ! {stream_event, RequestId, Msg},
587        {noreply, State6};
588    false ->
589        State3 = enqueue_stream_event(State2, RequestId, Event),
590        {noreply, State3};
591    nil ->
592        % We might have explicitly closed a stream using close_stream command
593        % Before the server received close_stream message, it would have placed
594        % some mutations in the network buffer queue. We still need to acknowledge
595        % the mutations received.
596        {ok, State3} = check_and_send_buffer_ack(State, RequestId, Event, mutation),
597        {noreply, State3}
598    end;
599
600handle_info({'EXIT', Pid, {conn_error, Reason}}, #state{worker_pid = Pid} = State) ->
601    [Name, Bucket, _AdmUser, _AdmPasswd, _BufferSize] = State#state.args,
602    ?LOG_ERROR("dcp client (~s, ~s): dcp receive worker failed due to reason: ~p."
603        " Restarting dcp receive worker...",
604        [Bucket, Name, Reason]),
605    timer:sleep(?DCP_RETRY_TIMEOUT),
606    restart_worker(State);
607
608handle_info({'EXIT', Pid, Reason}, #state{worker_pid = Pid} = State) ->
609    {stop, Reason, State};
610
611handle_info({print_log, ReqId}, State) ->
612    [Name, Bucket, _AdmUser, _AdmPasswd, _BufferSize] = State#state.args,
613    case find_stream_info(ReqId, State) of
614    nil ->
615        ?LOG_ERROR(
616            "dcp client (~s, ~s): Obtaining message from server timed out "
617            "after ~p seconds [RequestId ~p]. Waiting...",
618            [Bucket, Name, ?TIMEOUT / 1000, ReqId]);
619    StreamInfo ->
620        #stream_info{
621           start_seq = Start,
622           end_seq = End,
623           part_id = PartId
624        } = StreamInfo,
625        ?LOG_ERROR("dcp client (~s, ~s): Obtaining mutation from server timed out "
626            "after ~p seconds [RequestId ~p, PartId ~p, StartSeq ~p, EndSeq ~p]. Waiting...",
627            [Bucket, Name, ?TIMEOUT / 1000, ReqId, PartId, Start, End])
628    end,
629    {noreply, State};
630
631handle_info(Msg, State) ->
632    {stop, {unexpected_info, Msg}, State}.
633
634-spec handle_cast(any(), #state{}) ->
635                         {stop, {unexpected_cast, any()}, #state{}}.
636handle_cast(Msg, State) ->
637    {stop, {unexpected_cast, Msg}, State}.
638
639
640-spec terminate(any(), #state{}) -> ok.
641terminate(_Reason, #state{worker_pid = Pid}) ->
642    exit(Pid, shutdown),
643    ok.
644
645
646-spec code_change(any(), #state{}, any()) -> {ok, #state{}}.
647code_change(_OldVsn, State, _Extra) ->
648    {ok, State}.
649
650
651format_status(_Opt, [_PDict, #state{stream_queues = StreamQueues} = State]) ->
652    TransformFn = fun(_Key, {Waiter, Queue}) ->
653                     {Waiter, {queue:len(Queue), queue:peek_r(Queue)}}
654                  end,
655    State#state{stream_queues = dict:map(TransformFn, StreamQueues)}.
656
657
658% Internal functions
659
660-spec sasl_auth(binary(), binary(), #state{}) -> {ok, #state{}} |
661                            {error, sasl_auth_failed | closed | inet:posix()}.
662sasl_auth(User, Passwd, State) ->
663    #state{
664        bufsocket = BufSocket,
665        timeout = DcpTimeout,
666        request_id = RequestId
667    } = State,
668    Authenticate = couch_dcp_consumer:encode_sasl_auth(User, Passwd, RequestId),
669    case bufsocket_send(BufSocket, Authenticate) of
670    ok ->
671        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
672        {ok, Header, BufSocket2} ->
673            {sasl_auth, Status, RequestId, BodyLength} =
674                couch_dcp_consumer:parse_header(Header),
675            % Receive the body so that it is not mangled with the next request,
676            % we care about the status only though
677            case bufsocket_recv(BufSocket2, BodyLength, DcpTimeout) of
678            {ok, _, BufSocket3} ->
679                case Status of
680                ?DCP_STATUS_OK ->
681                    {ok, State#state{
682                        request_id = RequestId + 1,
683                        bufsocket = BufSocket3
684                    }};
685                ?DCP_STATUS_SASL_AUTH_FAILED ->
686                    {error, sasl_auth_failed}
687                end;
688            {error, _} = Error ->
689                Error
690            end;
691        {error, _} = Error ->
692            Error
693        end;
694    {error, _} = Error ->
695        Error
696    end.
697
698-spec select_bucket(binary(), #state{}) -> {ok, #state{}} | {error, term()}.
699select_bucket(Bucket, State) ->
700    #state{
701        bufsocket = BufSocket,
702        timeout = DcpTimeout,
703        request_id = RequestId
704    } = State,
705    SelectBucket = couch_dcp_consumer:encode_select_bucket(Bucket, RequestId),
706    case bufsocket_send(BufSocket, SelectBucket) of
707    ok ->
708        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
709        {ok, Header, BufSocket2} ->
710            {select_bucket, Status, RequestId, BodyLength} =
711                                    couch_dcp_consumer:parse_header(Header),
712            case Status of
713            ?DCP_STATUS_OK ->
714                {ok, State#state{
715                    request_id = RequestId + 1,
716                    bufsocket = BufSocket2
717                }};
718            _ ->
719                case parse_error_response(
720                        BufSocket2, DcpTimeout, BodyLength, Status) of
721                % When the authentication happened with bucket name and
722                % password, then the correct bucket is already selected. In
723                % this case a select bucket command returns "not supported".
724                {{error, not_supported}, BufSocket3} ->
725                    {ok, State#state{
726                        request_id = RequestId + 1,
727                        bufsocket = BufSocket3
728                    }};
729                {{error, _}, _} = Error ->
730                    Error
731                end
732            end;
733        {error, _} = Error ->
734            Error
735        end;
736    {error, _} = Error ->
737        Error
738    end.
739
740-spec open_connection(binary(), #state{}) -> {ok, #state{}} | {error, term()}.
741open_connection(Name, State) ->
742    #state{
743        bufsocket = BufSocket,
744        timeout = DcpTimeout,
745        request_id = RequestId
746    } = State,
747    OpenConnection = couch_dcp_consumer:encode_open_connection(
748        Name, RequestId),
749    case bufsocket_send(BufSocket, OpenConnection) of
750    ok ->
751        case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, DcpTimeout) of
752        {ok, Header, BufSocket2} ->
753            {open_connection, RequestId} = couch_dcp_consumer:parse_header(Header),
754            State2 = State#state{bufsocket = BufSocket2},
755            {ok, next_request_id(State2)};
756        {error, _} = Error ->
757            Error
758        end;
759    {error, _} = Error ->
760        Error
761    end.
762
763
764-spec receive_snapshot_marker(#bufsocket{}, timeout(),  size()) ->
765                                     {ok, {update_seq(), update_seq(),
766                                           non_neg_integer()}, #bufsocket{}} |
767                                     {error, closed}.
768receive_snapshot_marker(BufSocket, Timeout, BodyLength) ->
769    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
770    {ok, Body, BufSocket2} ->
771         {snapshot_marker, StartSeq, EndSeq, Type} =
772             couch_dcp_consumer:parse_snapshot_marker(Body),
773         {ok, {StartSeq, EndSeq, Type}, BufSocket2};
774    {error, _} = Error ->
775        Error
776    end.
777
778-spec receive_snapshot_mutation(#bufsocket{}, timeout(), partition_id(), size(),
779                                size(), size(), uint64(), dcp_data_type()) ->
780                                {#dcp_doc{}, #bufsocket{}} | {error, closed}.
781receive_snapshot_mutation(BufSocket, Timeout, PartId, KeyLength, BodyLength,
782        ExtraLength, Cas, DataType) ->
783    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
784    {ok, Body, BufSocket2} ->
785         {snapshot_mutation, Mutation} =
786             couch_dcp_consumer:parse_snapshot_mutation(KeyLength, Body,
787                 BodyLength, ExtraLength),
788         #mutation{
789             seq = Seq,
790             rev_seq = RevSeq,
791             flags = Flags,
792             expiration = Expiration,
793             key = Key,
794             value = Value
795         } = Mutation,
796         {#dcp_doc{
797             id = Key,
798             body = Value,
799             data_type = DataType,
800             partition = PartId,
801             cas = Cas,
802             rev_seq = RevSeq,
803             seq = Seq,
804             flags = Flags,
805             expiration = Expiration,
806             deleted = false
807         }, BufSocket2};
808    {error, _} = Error ->
809        Error
810    end.
811
812-spec receive_snapshot_deletion(#bufsocket{}, timeout(), partition_id(), size(),
813                                size(), uint64(), dcp_data_type()) ->
814                                        {#dcp_doc{}, #bufsocket{}} |
815                                        {error, closed | inet:posix()}.
816receive_snapshot_deletion(BufSocket, Timeout, PartId, KeyLength, BodyLength,
817        Cas, DataType) ->
818    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
819    {ok, Body, BufSocket2} ->
820         {snapshot_deletion, Deletion} =
821             couch_dcp_consumer:parse_snapshot_deletion(KeyLength, Body),
822         {Seq, RevSeq, Key, _Metadata} = Deletion,
823         {#dcp_doc{
824             id = Key,
825             body = <<>>,
826             data_type = DataType,
827             partition = PartId,
828             cas = Cas,
829             rev_seq = RevSeq,
830             seq = Seq,
831             flags = 0,
832             expiration = 0,
833             deleted = true
834         }, BufSocket2};
835    {error, Reason} ->
836        {error, Reason}
837    end.
838
839-spec receive_stream_end(#bufsocket{}, timeout(), size()) ->
840            {<<_:32>>, #bufsocket{}} | {error, closed | inet:posix()}.
841receive_stream_end(BufSocket, Timeout, BodyLength) ->
842    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
843    {ok, Flag, BufSocket2} ->
844        {Flag, BufSocket2};
845    {error, Reason} ->
846        {error, Reason}
847    end.
848
849
850% Returns the failover log as a list 2-tuple pairs with
851% partition UUID and sequence number
852-spec receive_failover_log(#bufsocket{}, timeout(), char(), size()) ->
853        {{'ok', list(partition_version())}, #bufsocket{}} | {error, closed | inet:posix()}.
854receive_failover_log(_BufSocket, _Timeout, _Status, 0) ->
855    {error, no_failover_log_found};
856receive_failover_log(BufSocket, Timeout, Status, BodyLength) ->
857    case Status of
858    ?DCP_STATUS_OK ->
859        case bufsocket_recv(BufSocket, BodyLength, Timeout) of
860        {ok, Body, BufSocket2} ->
861            {couch_dcp_consumer:parse_failover_log(Body), BufSocket2};
862        {error, _} = Error->
863            Error
864        end;
865    _ ->
866        {error, Status}
867    end.
868
869-spec receive_rollback_seq(#bufsocket{}, timeout(), size()) ->
870        {ok, update_seq(), #bufsocket{}} | {error, closed | inet:posix()}.
871receive_rollback_seq(BufSocket, Timeout, BodyLength) ->
872    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
873    {ok, <<RollbackSeq:?DCP_SIZES_BY_SEQ>>, BufSocket2} ->
874        {ok, RollbackSeq, BufSocket2};
875    {error, _} = Error->
876        Error
877    end.
878
879
880-spec receive_stat(#bufsocket{}, timeout(), dcp_status(), size(), size()) ->
881        {ok, {binary(), binary()} |
882        {ok, {binary(), binary()}, #bufsocket{}} |
883        {error, {dcp_status(), binary()}}} |
884        {error, closed}.
885receive_stat(BufSocket, Timeout, Status, BodyLength, KeyLength) ->
886    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
887    {ok, Body, BufSocket2} ->
888        {couch_dcp_consumer:parse_stat(
889            Body, Status, KeyLength, BodyLength - KeyLength), BufSocket2};
890    {error, Reason} ->
891        {error, Reason}
892    end.
893
894-spec receive_all_seqs(#bufsocket{}, timeout(), dcp_status(), size()) ->
895        {ok, list()} |
896        {error, {dcp_status(), binary()}} |
897        {error, closed}.
898receive_all_seqs(BufSocket, Timeout, Status, BodyLength) ->
899    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
900    {ok, Body, BufSocket2} ->
901        {couch_dcp_consumer:parse_all_seqs(Status, Body, []), BufSocket2};
902    {error, Reason} ->
903        {error, Reason}
904    end.
905
906-spec receive_events(pid(), request_id(), mutations_fold_fun(),
907                     mutations_fold_acc()) -> {ok, mutations_fold_acc()} |
908                                              {error, term()}.
909receive_events(Pid, RequestId, CallbackFn, InAcc) ->
910    {Optype, Data} = get_stream_event(Pid, RequestId),
911    case Optype of
912    stream_end ->
913        {ok, InAcc};
914    snapshot_marker ->
915        InAcc2 = CallbackFn({snapshot_marker, Data}, InAcc),
916        receive_events(Pid, RequestId, CallbackFn, InAcc2);
917    error ->
918        {error, Data};
919    _ ->
920        InAcc2 = CallbackFn(Data, InAcc),
921        receive_events(Pid, RequestId, CallbackFn, InAcc2)
922    end.
923
924-spec socket_send(socket(), iodata()) ->
925        ok | {error, closed | inet:posix()}.
926socket_send(Socket, Packet) ->
927    gen_tcp:send(Socket, Packet).
928
929-spec bufsocket_send(#bufsocket{}, iodata()) ->
930        ok | {error, closed | inet:posix()}.
931bufsocket_send(BufSocket, Packet) ->
932    socket_send(?SOCKET(BufSocket), Packet).
933
934-spec socket_recv(socket(), timeout()) ->
935        {ok, binary()} | {error, closed | inet:posix()}.
936socket_recv(SockPid, Timeout) ->
937    receive
938    {tcp, SockPid, Data} ->
939        {ok, Data};
940    {tcp_closed, SockPid} ->
941        {error, closed};
942    {tcp_error, SockPid, Reason} ->
943        {error, Reason}
944    after Timeout ->
945        {ok, <<>>}
946    end.
947
948-spec bufsocket_recv(#bufsocket{}, size(), timeout()) ->
949        {ok, binary(), #bufsocket{}} | {error, closed | inet:posix()}.
950bufsocket_recv(BufSocket, 0, _Timeout) ->
951    {ok, <<>>, BufSocket};
952bufsocket_recv(BufSocket, Length, Timeout) ->
953    #bufsocket{sockbuf = SockBuf} =  BufSocket,
954    case erlang:byte_size(SockBuf) >= Length of
955    true ->
956        <<Head:Length/binary, Tail/binary>> = SockBuf,
957        BufSocket2 = BufSocket#bufsocket{sockbuf = Tail},
958        {ok, Head, BufSocket2};
959    false ->
960        case socket_recv(?SOCKET(BufSocket), Timeout) of
961        {ok, Data} ->
962            Buf = <<SockBuf/binary, Data/binary>>,
963            BufSocket2 = BufSocket#bufsocket{sockbuf = Buf},
964            bufsocket_recv(BufSocket2, Length, Timeout);
965        {error, Reason} ->
966            {error, Reason}
967        end
968    end.
969
970-spec add_pending_request(#state{}, request_id(), term(), nil | {pid(), term()}) -> #state{}.
971add_pending_request(State, RequestId, ReqInfo, From) ->
972    #state{
973       pending_requests = PendingRequests
974    } = State,
975    PendingRequests2 = dict:store(RequestId, {ReqInfo, From}, PendingRequests),
976    State#state{pending_requests = PendingRequests2}.
977
978remove_pending_request(State, RequestId) ->
979    #state{
980       pending_requests = PendingRequests
981    } = State,
982    PendingRequests2 = dict:erase(RequestId, PendingRequests),
983    State#state{pending_requests = PendingRequests2}.
984
985
986-spec find_pending_request(#state{}, request_id()) -> nil | {term(), nil | {pid(), term()}}.
987find_pending_request(State, RequestId) ->
988    #state{
989       pending_requests = PendingRequests
990    } = State,
991    case dict:find(RequestId, PendingRequests) of
992    error ->
993        nil;
994    {ok, Pending} ->
995        Pending
996    end.
997
998-spec next_request_id(#state{}) -> #state{}.
999next_request_id(#state{request_id = RequestId} = State) ->
1000    RequestId2 = case RequestId of
1001    Id when Id + 1 < (1 bsl ?DCP_SIZES_OPAQUE) ->
1002        Id + 1;
1003    _ ->
1004        0
1005    end,
1006    State#state{request_id = RequestId2}.
1007
1008-spec remove_request_queue(#state{}, request_id()) -> #state{}.
1009remove_request_queue(State, RequestId) ->
1010    #state{
1011       active_streams = ActiveStreams,
1012       stream_queues = StreamQueues
1013    } = State,
1014    ActiveStreams2 = lists:keydelete(RequestId, 2, ActiveStreams),
1015
1016    % All active streams have finished reading
1017    % Let us ack for remaining unacked bytes
1018    case length(ActiveStreams2) of
1019    0 ->
1020        {ok, State2} = send_buffer_ack(State);
1021    _ ->
1022        State2 = State
1023    end,
1024
1025    StreamQueues2 = dict:erase(RequestId, StreamQueues),
1026    State2#state{
1027       active_streams = ActiveStreams2,
1028       stream_queues = StreamQueues2
1029    }.
1030
1031
1032-spec add_request_queue(#state{}, partition_id(), request_id()) -> #state{}.
1033add_request_queue(State, PartId, RequestId) ->
1034    #state{
1035       active_streams = ActiveStreams,
1036       stream_queues = StreamQueues
1037    } = State,
1038   ActiveStreams2 =  [{PartId, RequestId} | ActiveStreams],
1039   StreamQueues2 = dict:store(RequestId, {nil, queue:new()}, StreamQueues),
1040   State#state{
1041       active_streams = ActiveStreams2,
1042       stream_queues = StreamQueues2
1043    }.
1044
1045
1046-spec enqueue_stream_event(#state{}, request_id(), tuple()) -> #state{}.
1047enqueue_stream_event(State, RequestId, Event) ->
1048    #state{
1049       stream_queues = StreamQueues
1050    } = State,
1051    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1052    State#state{
1053        stream_queues =
1054            dict:store(RequestId,
1055                       {Waiter, queue:in(Event, EvQueue)},
1056                       StreamQueues)
1057    }.
1058
1059-spec dequeue_stream_event(#state{}, request_id()) ->
1060                               {#state{}, tuple()}.
1061dequeue_stream_event(State, RequestId) ->
1062    #state{
1063       stream_queues = StreamQueues
1064    } = State,
1065    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1066    {{value, Event}, Rest} = queue:out(EvQueue),
1067    State2 = State#state{
1068        stream_queues =
1069            dict:store(RequestId, {Waiter, Rest}, StreamQueues)
1070    },
1071    {State2, Event}.
1072
1073-spec peek_stream_event(#state{}, request_id()) -> tuple().
1074peek_stream_event(State, RequestId) ->
1075    #state{
1076       stream_queues = StreamQueues
1077    } = State,
1078    {ok, {_, EvQueue}} = dict:find(RequestId, StreamQueues),
1079    {value, Event} = queue:peek(EvQueue),
1080    Event.
1081
1082-spec add_stream_event_waiter(#state{}, request_id(), term()) -> #state{} | nil.
1083add_stream_event_waiter(State, RequestId, NewWaiter) ->
1084    #state{
1085       stream_queues = StreamQueues
1086    } = State,
1087    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1088    case Waiter of
1089    nil ->
1090        StreamQueues2 = dict:store(RequestId, {NewWaiter, EvQueue}, StreamQueues),
1091        State#state{
1092           stream_queues = StreamQueues2
1093        };
1094    _ ->
1095        nil
1096    end.
1097
1098
1099-spec stream_event_present(#state{}, request_id()) -> nil | true | false.
1100stream_event_present(State, RequestId) ->
1101    #state{
1102       stream_queues = StreamQueues
1103    } = State,
1104    case dict:find(RequestId, StreamQueues) of
1105    error ->
1106        nil;
1107    {ok, {_, EvQueue}} ->
1108        queue:is_empty(EvQueue) =:= false
1109    end.
1110
1111
1112-spec stream_event_waiters_present(#state{}, request_id()) -> nil | true | false.
1113stream_event_waiters_present(State, RequestId) ->
1114    #state{
1115       stream_queues = StreamQueues
1116    } = State,
1117    case dict:find(RequestId, StreamQueues) of
1118    error ->
1119        nil;
1120    {ok, {Waiter, _}} ->
1121        Waiter =/= nil
1122    end.
1123
1124
1125-spec remove_stream_event_waiter(#state{}, request_id()) -> {#state{}, term()}.
1126remove_stream_event_waiter(State, RequestId) ->
1127    #state{
1128       stream_queues = StreamQueues
1129    } = State,
1130    {ok, {Waiter, EvQueue}} = dict:find(RequestId, StreamQueues),
1131    State2 = State#state{
1132        stream_queues = dict:store(RequestId, {nil, EvQueue}, StreamQueues)
1133    },
1134    {State2, Waiter}.
1135
1136
1137-spec find_stream_req_id(#state{}, partition_id()) -> request_id() | nil.
1138find_stream_req_id(State, PartId) ->
1139    #state{
1140       active_streams = ActiveStreams
1141    } = State,
1142    case lists:keyfind(PartId, 1, ActiveStreams) of
1143    {PartId, StreamReqId} ->
1144        StreamReqId;
1145    false ->
1146        nil
1147    end.
1148
1149-spec parse_error_response(#bufsocket{}, timeout(), integer(), integer()) ->
1150                    {'error', atom() | {'status', integer()}} |
1151                    {'error', atom() | {'status', integer()}, #bufsocket{}}.
1152parse_error_response(BufSocket, Timeout, BodyLength, Status) ->
1153    case bufsocket_recv(BufSocket, BodyLength, Timeout) of
1154    {ok, _, BufSocket2} ->
1155        {status_to_error(Status), BufSocket2};
1156    {error, _} = Error ->
1157        Error
1158    end.
1159
1160-spec status_to_error(integer()) -> {'error', atom() | {'status', integer()}}.
1161status_to_error(?DCP_STATUS_KEY_NOT_FOUND) ->
1162    {error, vbucket_stream_not_found};
1163status_to_error(?DCP_STATUS_ERANGE) ->
1164    {error, wrong_start_sequence_number};
1165status_to_error(?DCP_STATUS_KEY_EEXISTS) ->
1166    {error, vbucket_stream_already_exists};
1167status_to_error(?DCP_STATUS_NOT_MY_VBUCKET) ->
1168    {error, server_not_my_vbucket};
1169status_to_error(?DCP_STATUS_TMP_FAIL) ->
1170    {error, vbucket_stream_tmp_fail};
1171status_to_error(?DCP_STATUS_NOT_SUPPORTED) ->
1172    {error, not_supported};
1173status_to_error(Status) ->
1174    {error, {status, Status}}.
1175
1176
1177% The worker process for handling dcp connection downstream pipe
1178% Read and parse downstream messages and send to the gen_server process
1179-spec receive_worker(#bufsocket{}, timeout(), pid(), list()) ->
1180                                                    closed | inet:posix().
1181receive_worker(BufSocket, Timeout, Parent, MsgAcc0) ->
1182    case bufsocket_recv(BufSocket, ?DCP_HEADER_LEN, infinity) of
1183    {ok, Header, BufSocket2} ->
1184        {Action, MsgAcc, BufSocket3} =
1185        case couch_dcp_consumer:parse_header(Header) of
1186        {control_request, Status, RequestId} ->
1187            {done, {stream_response, RequestId, {RequestId, Status}},
1188                BufSocket2};
1189        {noop_request, RequestId} ->
1190            {done, {stream_noop, RequestId}, BufSocket2};
1191        {buffer_ack, ?DCP_STATUS_OK, _RequestId} ->
1192            {true, [], BufSocket2};
1193        {stream_request, Status, RequestId, BodyLength} ->
1194            {Response, BufSocket5} = case Status of
1195            ?DCP_STATUS_OK ->
1196                case receive_failover_log(
1197                    BufSocket2, Timeout, Status, BodyLength) of
1198                {{ok, FailoverLog}, BufSocket4} ->
1199                    {{failoverlog, FailoverLog}, BufSocket4};
1200                Error ->
1201                    {Error, BufSocket2}
1202                end;
1203            ?DCP_STATUS_ROLLBACK ->
1204                case receive_rollback_seq(
1205                    BufSocket2, Timeout, BodyLength) of
1206                {ok, RollbackSeq, BufSocket4} ->
1207                    {{rollback, RollbackSeq}, BufSocket4};
1208                Error ->
1209                    {Error, BufSocket2}
1210                end;
1211            _ ->
1212                parse_error_response(BufSocket2, Timeout, BodyLength, Status)
1213            end,
1214            {done, {stream_response, RequestId, {RequestId, Response}},
1215                BufSocket5};
1216        {failover_log, Status, RequestId, BodyLength} ->
1217            {Response, BufSocket5} = receive_failover_log(
1218                BufSocket2, Timeout, Status, BodyLength),
1219            {done, {stream_response, RequestId, Response}, BufSocket5};
1220        {stream_close, Status, RequestId, BodyLength} ->
1221            {Response, BufSocket5} = case Status of
1222            ?DCP_STATUS_OK ->
1223                {ok, BufSocket2};
1224            _ ->
1225                parse_error_response(BufSocket2, Timeout, BodyLength, Status)
1226            end,
1227            {done, {stream_response, RequestId, Response}, BufSocket5};
1228        {stats, Status, RequestId, BodyLength, KeyLength} ->
1229            case BodyLength of
1230            0 ->
1231                case Status of
1232                ?DCP_STATUS_OK ->
1233                    StatAcc = lists:reverse(MsgAcc0),
1234                    {done, {stream_response, RequestId, {ok, StatAcc}},
1235                        BufSocket2};
1236                % Some errors might not contain a body
1237                _ ->
1238                    Error = {error, {Status, <<>>}},
1239                    {done, {stream_response, RequestId, Error}, BufSocket2}
1240                end;
1241            _ ->
1242                case receive_stat(
1243                    BufSocket2, Timeout, Status, BodyLength, KeyLength) of
1244                {{ok, Stat}, BufSocket5} ->
1245                    {true, [Stat | MsgAcc0], BufSocket5};
1246                {error, _} = Error ->
1247                    {done, {stream_response, RequestId, Error}, BufSocket2}
1248                end
1249            end;
1250        {snapshot_marker, _PartId, RequestId, BodyLength} ->
1251            {ok, SnapshotMarker, BufSocket5} = receive_snapshot_marker(
1252                BufSocket2, Timeout, BodyLength),
1253            {done, {stream_event, RequestId,
1254                {snapshot_marker, SnapshotMarker, BodyLength}}, BufSocket5};
1255        {snapshot_mutation, PartId, RequestId, KeyLength, BodyLength,
1256                ExtraLength, Cas, DataType} ->
1257            {Mutation, BufSocket5} = receive_snapshot_mutation(
1258                BufSocket2, Timeout, PartId, KeyLength, BodyLength, ExtraLength,
1259                Cas, DataType),
1260            {done, {stream_event, RequestId,
1261                    {snapshot_mutation, Mutation, BodyLength}}, BufSocket5};
1262        % For the indexer and XDCR there's no difference between a deletion
1263        % end an expiration. In both cases the items should get removed.
1264        % Hence the same code can be used after the initial header
1265        % parsing (the body is the same).
1266        {OpCode, PartId, RequestId, KeyLength, BodyLength, Cas, DataType} when
1267                OpCode =:= snapshot_deletion orelse
1268                OpCode =:= snapshot_expiration ->
1269            {Deletion, BufSocket5} = receive_snapshot_deletion(
1270                BufSocket2, Timeout, PartId, KeyLength, BodyLength,
1271                Cas, DataType),
1272            {done, {stream_event, RequestId,
1273                {snapshot_deletion, Deletion, BodyLength}}, BufSocket5};
1274        {stream_end, PartId, RequestId, BodyLength} ->
1275            {Flag, BufSocket5} = receive_stream_end(BufSocket2,
1276                Timeout, BodyLength),
1277            {done, {stream_event, RequestId, {stream_end,
1278                {RequestId, PartId, Flag}, BodyLength}}, BufSocket5};
1279        {all_seqs, Status, RequestId, BodyLength} ->
1280            {Resp, BufSocket5} = receive_all_seqs(
1281                BufSocket2, Timeout, Status, BodyLength),
1282            {done, {stream_response, RequestId, Resp}, BufSocket5}
1283        end,
1284        case Action of
1285        done ->
1286            Parent ! MsgAcc,
1287            receive_worker(BufSocket3, Timeout, Parent, []);
1288        true ->
1289            receive_worker(BufSocket3, Timeout, Parent, MsgAcc)
1290        end;
1291    {error, Reason} ->
1292        exit({conn_error, Reason})
1293    end.
1294
1295% Check if we need to send buffer ack to server and send it
1296% if required.
1297-spec check_and_send_buffer_ack(#state{}, request_id(), tuple() | nil, atom()) ->
1298                        {ok, #state{}} | {error, closed | inet:posix()}.
1299check_and_send_buffer_ack(State, _RequestId, {error, _, _}, _Type) ->
1300    {ok, State};
1301
1302check_and_send_buffer_ack(State, RequestId, Event, Type) ->
1303    #state{
1304        bufsocket = BufSocket,
1305        max_buffer_size = MaxBufSize,
1306        stream_queues = StreamQueues,
1307        total_buffer_size = Size
1308    } = State,
1309    Size2 = case Type of
1310    remove_stream ->
1311        case dict:find(RequestId, StreamQueues) of
1312        error ->
1313            Size;
1314        {ok, {_, EvQueue}} ->
1315            get_queue_size(EvQueue, Size)
1316        end;
1317    mutation ->
1318        Size + get_event_size(Event)
1319    end,
1320    MaxAckSize = MaxBufSize * ?DCP_BUFFER_ACK_THRESHOLD,
1321    {Status, Ret} = if
1322    Size2 > MaxAckSize ->
1323        BufferAckRequest = couch_dcp_consumer:encode_buffer_request(0, Size2),
1324        case bufsocket_send(BufSocket, BufferAckRequest) of
1325        ok ->
1326            {ok, 0};
1327        Error ->
1328            {error, Error}
1329        end;
1330    Size2 == 0 ->
1331        {false, stream_not_found};
1332    true ->
1333        {ok, Size2}
1334    end,
1335    case Status of
1336    ok ->
1337        State2 = State#state{
1338            total_buffer_size = Ret
1339        },
1340        {ok, State2};
1341    error ->
1342        {error, Ret};
1343    false ->
1344        {ok, State}
1345    end.
1346
1347-spec send_buffer_ack(#state{}) ->
1348            {ok, #state{}} | {error, closed | inet:posix()}.
1349send_buffer_ack(State) ->
1350    #state{
1351        bufsocket = BufSocket,
1352        total_buffer_size = Size
1353    } = State,
1354    BufferAckRequest = couch_dcp_consumer:encode_buffer_request(0, Size),
1355    case bufsocket_send(BufSocket, BufferAckRequest) of
1356    ok ->
1357        {ok, State#state{total_buffer_size = 0}};
1358    {error, _Reason} = Error ->
1359        Error
1360    end.
1361
1362
1363-spec set_buffer_size(#state{}, non_neg_integer()) -> {ok ,#state{}} |
1364                                            {error, closed | inet:posix()}.
1365set_buffer_size(State, Size) ->
1366    #state{
1367        bufsocket = BufSocket,
1368        request_id = RequestId
1369    } = State,
1370    ControlRequest = couch_dcp_consumer:encode_control_request(RequestId, connection, Size),
1371    case bufsocket_send(BufSocket, ControlRequest) of
1372    ok ->
1373        State2 = next_request_id(State),
1374        State3 = add_pending_request(State2, RequestId, {control_request, Size}, nil),
1375        State4 = State3#state{max_buffer_size = Size},
1376        {ok, State4};
1377    {error, Error} ->
1378        {error, Error}
1379    end.
1380
1381-spec get_queue_size(queue(), non_neg_integer()) -> non_neg_integer().
1382get_queue_size(EvQueue, Size) ->
1383    case queue:out(EvQueue) of
1384    {empty, _} ->
1385        Size;
1386    {{value, Item}, NewQueue} ->
1387        Size2 = Size + get_event_size(Item),
1388        get_queue_size(NewQueue, Size2)
1389    end.
1390
1391-spec get_event_size({atom(), #dcp_doc{}, non_neg_integer()}) -> non_neg_integer().
1392get_event_size({_Type, _Doc, BodyLength}) ->
1393    ?DCP_HEADER_LEN + BodyLength.
1394
1395-spec remove_stream_info(partition_id(), #state{}) -> #state{}.
1396remove_stream_info(PartId, State) ->
1397    #state{
1398        active_streams = ActiveStreams,
1399        stream_info = StreamData
1400    } = State,
1401    case lists:keyfind(PartId, 1, ActiveStreams) of
1402    false ->
1403        State;
1404    {_, RequestId} ->
1405        StreamData2 = dict:erase(RequestId, StreamData),
1406        State#state{stream_info = StreamData2}
1407    end.
1408
1409-spec insert_stream_info(partition_id(), request_id(), uuid(), non_neg_integer(),
1410                        non_neg_integer(), #state{}, non_neg_integer()) -> #state{}.
1411insert_stream_info(PartId, RequestId, PartUuid, StartSeq, EndSeq, State, Flags) ->
1412    #state{stream_info = StreamData} = State,
1413    Data = #stream_info{
1414        part_id = PartId,
1415        part_uuid = PartUuid,
1416        start_seq = StartSeq,
1417        end_seq = EndSeq,
1418        flags = Flags
1419    },
1420    StreamData2 = dict:store(RequestId, Data, StreamData),
1421    State#state{stream_info = StreamData2}.
1422
1423-spec find_stream_info(request_id(), #state{}) -> #stream_info{} | nil.
1424find_stream_info(RequestId, State) ->
1425    #state{
1426       stream_info = StreamData
1427    } = State,
1428    case dict:find(RequestId, StreamData) of
1429    {ok, Info} ->
1430        Info;
1431    error ->
1432        nil
1433    end.
1434
1435-spec add_new_stream({partition_id(), uuid(), update_seq(), update_seq(),
1436        {update_seq(), update_seq()}, 0..255},
1437        {pid(), string()}, #state{}) -> #state{} | {error, closed | inet:posix()}.
1438add_new_stream({PartId, PartUuid, StartSeq, EndSeq,
1439    {SnapSeqStart, SnapSeqEnd}, Flags}, From, State) ->
1440   #state{
1441       bufsocket = BufSocket,
1442       request_id = RequestId
1443    } = State,
1444    StreamRequest = couch_dcp_consumer:encode_stream_request(
1445        PartId, RequestId, Flags, StartSeq, EndSeq, PartUuid, SnapSeqStart, SnapSeqEnd),
1446    case bufsocket_send(BufSocket, StreamRequest) of
1447    ok ->
1448        State2 = insert_stream_info(PartId, RequestId, PartUuid, StartSeq, EndSeq,
1449            State, Flags),
1450        State3 = next_request_id(State2),
1451        add_pending_request(State3, RequestId, {add_stream, PartId}, From);
1452    Error ->
1453        {error, Error}
1454    end.
1455
1456-spec restart_worker(#state{}) -> {noreply, #state{}} | {stop, sasl_auth_failed}.
1457restart_worker(State) ->
1458    #state{
1459        args = Args
1460    } = State,
1461    case init(Args) of
1462    {stop, Reason} ->
1463        {stop, Reason, State};
1464    {ok, State2} ->
1465        #state{
1466            bufsocket = BufSocket,
1467            worker_pid = WorkerPid
1468        } = State2,
1469        % Replace the socket
1470        State3 = State#state{
1471            bufsocket = BufSocket,
1472            pending_requests = dict:new(),
1473            worker_pid = WorkerPid
1474        },
1475        Error = {error, dcp_conn_closed},
1476        dict:map(fun(_RequestId, {ReqInfo, SendTo}) ->
1477            case ReqInfo of
1478            get_stats ->
1479                {MRef, From} = SendTo,
1480                From ! {get_stats, MRef, Error};
1481	    all_seqs ->
1482                {MRef, From} = SendTo,
1483                From ! {all_seqs, MRef, Error};
1484            {control_request, _} ->
1485                ok;
1486            _ ->
1487                gen_server:reply(SendTo, Error)
1488            end
1489        end, State#state.pending_requests),
1490        dict:map(fun(RequestId, _Value) ->
1491            Event = {error, dcp_conn_closed, 0},
1492            self() ! {stream_event, RequestId, Event}
1493        end, State#state.stream_info),
1494        {noreply, State3}
1495    end.
1496
1497-spec store_snapshot_seq(request_id(), {update_seq(), update_seq(),
1498                            non_neg_integer()}, #state{}) -> #state{}.
1499store_snapshot_seq(RequestId, Data, State) ->
1500    {StartSeq, EndSeq, _Type} = Data,
1501    #state{
1502        stream_info = StreamData
1503    } = State,
1504    case dict:find(RequestId, StreamData) of
1505    {ok, Val} ->
1506        Val2 = Val#stream_info{snapshot_seq = {StartSeq, EndSeq}},
1507        StreamData2 = dict:store(RequestId, Val2, StreamData),
1508        State#state{stream_info = StreamData2};
1509    error ->
1510        State
1511    end.
1512
1513-spec store_snapshot_mutation(request_id(), #dcp_doc{}, #state{}) -> #state{}.
1514store_snapshot_mutation(RequestId, Data, State) ->
1515  #dcp_doc{
1516        seq = Seq
1517    } = Data,
1518    #state{
1519        stream_info = StreamData
1520    } = State,
1521    case dict:find(RequestId, StreamData) of
1522    error ->
1523        State;
1524    {ok, Val} ->
1525        Val2 = Val#stream_info{start_seq = Seq},
1526        StreamData2 = dict:store(RequestId, Val2, StreamData),
1527        State#state{stream_info = StreamData2}
1528    end.
1529
1530-spec remove_body_len({atom(), tuple | #dcp_doc{}, non_neg_integer()}) ->
1531                                                {atom(), tuple | #dcp_doc{}}.
1532remove_body_len({Type, Data, _BodyLength}) ->
1533    {Type, Data}.
1534