1% Licensed under the Apache License, Version 2.0 (the "License"); you may not
2% use this file except in compliance with the License. You may obtain a copy of
3% the License at
4%
5%   http://www.apache.org/licenses/LICENSE-2.0
6%
7% Unless required by applicable law or agreed to in writing, software
8% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10% License for the specific language governing permissions and limitations under
11% the License.
12
13-module(couch_dcp_fake_server).
14-behaviour(gen_server).
15
16% Public API
17-export([start/1, reset/0]).
18
19% Only uses by tests
20-export([set_failover_log/2, set_persisted_items_fun/1,
21    set_items_per_snapshot/1, set_dups_per_snapshot/1,
22    pause_mutations/0, ceil_div/2, num_items_with_dups/3]).
23-export([send_single_mutation/0, continue_mutations/0]).
24-export([get_num_buffer_acks/0, is_control_req/0, close_connection/1]).
25-export([close_on_next/0]).
26
27% Needed for internal process spawning
28-export([accept/1, accept_loop/1]).
29
30% gen_server callbacks
31-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2,
32    code_change/3]).
33
34-include_lib("couch_dcp/include/couch_dcp.hrl").
35-include_lib("couch_dcp/include/couch_dcp_typespecs.hrl").
36
37
38-define(dbname(SetName, PartId),
39    <<SetName/binary, $/, (list_to_binary(integer_to_list(PartId)))/binary>>).
40-define(master_dbname(SetName), <<SetName/binary, "/master">>).
41
42
43% #doc_info{}, #doc{}, #db{} are copy & pasted from couch_db.hrl
44-record(doc_info,
45    {
46    id = <<"">>,
47    deleted = false,
48    local_seq,
49    rev = {0, <<>>},
50    body_ptr,
51    content_meta = 0, % should be 0-255 only.
52    size = 0
53    }).
54-record(doc,
55    {
56    id = <<>>,
57    rev = {0, <<>>},
58
59    % the binary body
60    body = <<"{}">>,
61    content_meta = 0, % should be 0-255 only.
62
63    deleted = false,
64
65    % key/value tuple of meta information, provided when using special options:
66    % couch_db:open_doc(Db, Id, Options).
67    meta = []
68    }).
69-record(db,
70    {main_pid = nil,
71    update_pid = nil,
72    compactor_info = nil,
73    instance_start_time, % number of microsecs since jan 1 1970 as a binary string
74    fd,
75    fd_ref_counter,
76    header,% = #db_header{},
77    committed_update_seq,
78    docinfo_by_id_btree,
79    docinfo_by_seq_btree,
80    local_docs_btree,
81    update_seq,
82    name,
83    filepath,
84    security = [],
85    security_ptr = nil,
86    user_ctx,% = #user_ctx{},
87    waiting_delayed_commit = nil,
88    fsync_options = [],
89    options = []
90    }).
91
92
93-record(state, {
94    streams = [],
95    setname = nil,
96    failover_logs = dict:new(),
97    pause_mutations = false,
98    % This dictionary contains the meta data for flow control
99    % that will be used by unit tests.
100    control_flow = dict:new(),
101    % This is a function is used to simulate the stream request on persited
102    % items only. It returns the sequence number up to which the items should
103    % be considered as persisted.
104    % By default it returns half of the current high sequence number and at
105    % minimum 1. This ensures that a lot of the current test suites runs
106    % across the code path of restarting the updater.
107    persisted_items_fun = fun(Seq) -> max(Seq div 2, 1) end,
108    % Determines how many items a snapshot should have, if it's set to 0,
109    % all mutations will be returned in a single snapshot.
110    items_per_snapshot = 0,
111    % The number of duplicates a snapshot should contain. This option needs
112    % to be combined with items_per_snapshot. A snapshot will contain the
113    % given number of items from a previous snapshot (randomly chosen).
114    dups_per_snapshot = 0,
115    % Whether to close the connection on the next gen_server call
116    % NOTE vmx 2015-12-15: Currently only seqs and stats request are closed
117    close_on_next = false
118}).
119
120
121% Public API
122
123-spec start(binary()) -> {ok, pid()} | ignore |
124                         {error, {already_started, pid()} | term()}.
125start(SetName) ->
126    % Start the fake DCP server where the original one is expected to be
127    Port = list_to_integer(couch_config:get("dcp", "port", "0")),
128    gen_server:start({local, ?MODULE}, ?MODULE, [Port, SetName], []).
129
130-spec reset() -> ok.
131reset() ->
132    gen_server:call(?MODULE, reset).
133
134% Only used by tests to populate the failover log
135-spec set_failover_log(partition_id(), partition_version()) -> ok.
136set_failover_log(PartId, FailoverLog) ->
137    gen_server:call(?MODULE, {set_failover_log, PartId, FailoverLog}).
138
139% For unit tests only
140-spec set_persisted_items_fun(fun((update_seq()) -> update_seq())) -> ok.
141set_persisted_items_fun(Fun) ->
142    gen_server:call(?MODULE, {set_persisted_items_fun, Fun}).
143
144% For unit tests only
145-spec set_items_per_snapshot(non_neg_integer()) -> ok.
146set_items_per_snapshot(Num) ->
147    gen_server:call(?MODULE, {set_items_per_snapshot, Num}).
148
149% For unit tests only
150-spec set_dups_per_snapshot(non_neg_integer()) -> ok.
151set_dups_per_snapshot(Num) ->
152    gen_server:call(?MODULE, {set_dups_per_snapshot, Num}).
153
154-spec pause_mutations() -> ok.
155pause_mutations() ->
156    gen_server:call(?MODULE, {pause_mutations, true}).
157
158-spec continue_mutations() -> ok.
159continue_mutations() ->
160    ok = gen_server:call(?MODULE, {pause_mutations, false}),
161    ?MODULE ! send_mutations,
162    ok.
163
164% Used by unit tests. Check if server got buffer ack.
165-spec get_num_buffer_acks() -> integer().
166get_num_buffer_acks() ->
167    gen_server:call(?MODULE, get_num_buffer_acks).
168
169% Used by unit tests. Check if server got control request.
170-spec is_control_req() -> boolean().
171is_control_req() ->
172    gen_server:call(?MODULE, is_control_req).
173
174% Used by unit test.
175close_connection(PartId) ->
176    ok = gen_server:call(?MODULE, {close_connection, PartId}).
177
178% Used by unit test to control flow of mutation message from server to client
179-spec send_single_mutation() -> ok.
180send_single_mutation() ->
181    ?MODULE ! send_mutations,
182    ok.
183
184% Used by unit test. Closes the connectio on the next command it receives.
185close_on_next() ->
186    ok = gen_server:call(?MODULE, close_on_next).
187
188% gen_server callbacks
189
190-spec init([port() | binary()]) -> {ok, #state{}}.
191init([Port, SetName]) ->
192    {ok, Listen} = gen_tcp:listen(Port,
193        [binary, {packet, raw}, {active, false}, {reuseaddr, true}]),
194    case Port of
195    % In case the port was set to "0", the OS will decide which port to run
196    % the fake DCP server on. Update the configuration so that we know which
197    % port was chosen (that's only needed for the tests).
198    0 ->
199        {ok, RandomPort} = inet:port(Listen),
200        couch_config:set("dcp", "port", integer_to_list(RandomPort), false);
201    _ ->
202        ok
203    end,
204    accept(Listen),
205    {ok, #state{
206        streams = [],
207        setname = SetName
208    }}.
209
210
211-spec handle_call(tuple() | atom(), {pid(), reference()}, #state{}) ->
212                         {reply, any(), #state{}}.
213handle_call({add_stream, PartId, RequestId, StartSeq, EndSeq, Socket, FailoverLog}, _From, State) ->
214    #state{
215        streams = Streams,
216        pause_mutations = Pause,
217        items_per_snapshot = ItemsPerSnapshot,
218        dups_per_snapshot = DupsPerSnapshot
219    } = State,
220    case lists:keyfind(PartId, 1, State#state.streams) of
221    false ->
222        StreamOk = couch_dcp_producer:encode_stream_request_ok(
223            RequestId, FailoverLog),
224        ok = gen_tcp:send(Socket, StreamOk),
225        Mutations = case DupsPerSnapshot > 0 of
226        true ->
227            NumSnapshots = ceil_div(EndSeq - StartSeq, ItemsPerSnapshot),
228            % The first snapshot mustn't contain duplicates
229            EndSeq2 = EndSeq - (NumSnapshots - 1) * DupsPerSnapshot,
230            Mutations2 = create_mutations(State#state.setname, PartId,
231                StartSeq, EndSeq2),
232            Mutations3 = case StartSeq of
233            0 ->
234                create_mutations_dups(Mutations2, ItemsPerSnapshot,
235                    DupsPerSnapshot, 0);
236            _ ->
237                % create_mutations_dups/3 doesn't create duplicates in the
238                % first snapshot. If we are not in an initial index build
239                % (start seq > 0), then the first snapshot *must* contain
240                % duplicates. To KISS we just take DupsPerSnapshot number of
241                % items from the second (still unique) snapshot and prepend
242                % them.
243                Prepend = lists:sublist(Mutations2, ItemsPerSnapshot + 1,
244                    DupsPerSnapshot),
245                Dups = create_mutations_dups(Mutations2, ItemsPerSnapshot,
246                    DupsPerSnapshot, DupsPerSnapshot),
247                Prepend ++ Dups
248            end,
249            apply_sequence_numbers(Mutations3);
250        false ->
251            create_mutations(State#state.setname, PartId, StartSeq, EndSeq)
252        end,
253        Num = case ItemsPerSnapshot of
254        0 ->
255            length(Mutations);
256        _ ->
257            min(length(Mutations), ItemsPerSnapshot)
258        end,
259        Streams2 =
260            [{PartId, {RequestId, Mutations, Socket, 0}} | Streams],
261        case Pause of
262        true ->
263            ok;
264        false ->
265            % For unit tests it's OK to pretend that only snapshots
266            % received from the start are on-disk snapshots.
267            SnapshotType = case StartSeq of
268            0 ->
269                ?DCP_SNAPSHOT_TYPE_DISK;
270            _ ->
271                ?DCP_SNAPSHOT_TYPE_MEMORY
272            end,
273            Marker = couch_dcp_producer:encode_snapshot_marker(
274                PartId, RequestId, StartSeq, StartSeq + Num, SnapshotType),
275            ok = gen_tcp:send(Socket, Marker),
276            self() ! send_mutations
277        end,
278        {reply, ok, State#state{streams = Streams2}};
279    _ ->
280        StreamExists = couch_dcp_producer:encode_stream_request_error(
281                         RequestId, ?DCP_STATUS_KEY_EEXISTS),
282        ok = gen_tcp:send(Socket, StreamExists),
283        {reply, ok, State}
284    end;
285
286handle_call({remove_stream, PartId}, _From, #state{streams = Streams} = State) ->
287    Streams2 = lists:keydelete(PartId, 1, Streams),
288    Reply = case length(Streams2) =:= length(Streams) of
289    true ->
290        vbucket_stream_not_found;
291    false ->
292        ok
293    end,
294    {reply, Reply, State#state{streams = Streams2}};
295
296handle_call({set_failover_log, PartId, FailoverLog}, _From, State) ->
297    FailoverLogs = dict:store(PartId, FailoverLog, State#state.failover_logs),
298    {reply, ok, State#state{
299        failover_logs = FailoverLogs
300    }};
301
302handle_call({set_persisted_items_fun, Fun}, _From, State) ->
303    {reply, ok, State#state{
304        persisted_items_fun = Fun
305    }};
306
307handle_call({set_items_per_snapshot, Num}, _From, State) ->
308    {reply, ok, State#state{
309        items_per_snapshot = Num
310    }};
311
312handle_call({set_dups_per_snapshot, Num}, _From, State) ->
313    {reply, ok, State#state{
314        dups_per_snapshot = Num
315    }};
316
317handle_call({all_seqs, Socket, _}, _From,
318            #state{close_on_next = true} = State) ->
319    ok = gen_tcp:close(Socket),
320    {reply, ok, State#state{close_on_next = false}};
321
322handle_call({all_seqs, Socket, RequestId}, _From, State) ->
323  #state{
324        setname = SetName,
325        items_per_snapshot = ItemsPerSnapshot,
326        dups_per_snapshot = DupsPerSnapshot
327    } = State,
328    Partitions = list_partitions(SetName),
329    Result = lists:foldl(fun(PartId, Acc) ->
330        case get_sequence_number(SetName, PartId, ItemsPerSnapshot,
331            DupsPerSnapshot) of
332        {ok, Seq} ->
333            [{PartId, Seq} | Acc]
334        end
335    end, [], Partitions),
336    Data = couch_dcp_producer:encode_seqs(RequestId, lists:reverse(Result)),
337    ok = gen_tcp:send(Socket, Data),
338    {reply, ok, State};
339
340handle_call({send_stat, _, Socket, _, _}, _From,
341            #state{close_on_next = true} = State) ->
342    ok = gen_tcp:close(Socket),
343    {reply, ok, State#state{close_on_next = false}};
344
345handle_call({send_stat, Stat, Socket, RequestId, PartId}, _From, State) ->
346    #state{
347        setname = SetName
348    } = State,
349    case binary:split(Stat, <<" ">>) of
350    [<<"vbucket-seqno">>] ->
351        Partitions = list_partitions(SetName),
352        send_vbucket_seqnos_stats(State, SetName, Socket, RequestId, Partitions);
353    [<<"vbucket-seqno">>, BinPartId] ->
354        PartIdInt = list_to_integer(binary_to_list(BinPartId)),
355        send_vbucket_seqnos_stats(State, SetName, Socket, RequestId, [PartIdInt]);
356    [<<"vbucket-details">>, _] ->
357        case get_num_items(SetName, PartId) of
358        {ok, NumItems} ->
359            BinPartId = list_to_binary(integer_to_list(PartId)),
360            NumItemsKey = <<"vb_", BinPartId/binary ,":num_items">>,
361            NumItemsValue = list_to_binary(integer_to_list(NumItems)),
362            % The real vbucket-details response contains a lot of more
363            % stats, but we only care about the num_items
364            NumItemsStat = couch_dcp_producer:encode_stat(
365                RequestId, NumItemsKey, NumItemsValue),
366            ok = gen_tcp:send(Socket, NumItemsStat),
367
368            EndStat = couch_dcp_producer:encode_stat(RequestId, <<>>, <<>>),
369            ok = gen_tcp:send(Socket, EndStat);
370        {error, not_my_partition} ->
371            StatError = couch_dcp_producer:encode_stat_error(
372                RequestId, ?DCP_STATUS_NOT_MY_VBUCKET, <<>>),
373            ok = gen_tcp:send(Socket, StatError)
374        end
375    end,
376    {reply, ok, State};
377
378handle_call({get_failover_log, PartId}, _From, State) ->
379    FailoverLog = get_failover_log(PartId, State),
380    {reply, FailoverLog, State};
381
382handle_call(get_set_name, _From, State) ->
383    {reply, State#state.setname, State};
384
385handle_call(get_persisted_items_fun, _From, State) ->
386    {reply, State#state.persisted_items_fun, State};
387
388handle_call(get_items_per_snapshot, _From, State) ->
389    {reply, State#state.items_per_snapshot, State};
390
391handle_call(get_dups_per_snapshot, _From, State) ->
392    {reply, State#state.dups_per_snapshot, State};
393
394handle_call({pause_mutations, Flag}, _From, State) ->
395    {reply, ok, State#state{pause_mutations = Flag}};
396
397handle_call(reset, _From, State0) ->
398    State = #state{
399        setname = State0#state.setname
400    },
401    {reply, ok, State};
402
403% Increment the count for the buffer ack request
404handle_call({handle_buffer_ack, _Size}, _From, State) ->
405    #state {
406        control_flow = ControlFlow
407    } = State,
408    Val = case dict:find(num_buffer_acks, ControlFlow) of
409    error ->
410        1;
411    {ok, Value} ->
412        Value + 1
413    end,
414    ControlFlow2 = dict:store(num_buffer_acks, Val, ControlFlow),
415    {reply, ok, State#state{
416        control_flow = ControlFlow2
417    }};
418
419handle_call({handle_control_req, Size}, _From, State) ->
420    #state {
421        control_flow = ControlFlow
422    } = State,
423    ControlFlow2 = dict:store(control_req, Size, ControlFlow),
424    {reply, ok, State#state{
425        control_flow = ControlFlow2
426    }};
427
428% Return the current count of buffer ack requests
429handle_call(get_num_buffer_acks, _From, State) ->
430    #state {
431        control_flow = ControlFlow
432    } = State,
433    Val = case dict:find(num_buffer_acks, ControlFlow) of
434    error ->
435        0;
436    {ok, Value} ->
437        Value
438    end,
439    {reply, Val, State};
440
441% Return true if we got control request
442handle_call(is_control_req, _From, State) ->
443    #state {
444        control_flow = ControlFlow
445    } = State,
446    Val = case dict:find(control_req, ControlFlow) of
447    error ->
448        false;
449    _ ->
450        true
451    end,
452    {reply, Val, State};
453
454% Close all connections if PartId = nil
455handle_call({close_connection, PartId}, _From, State) ->
456     #state{
457       streams = Streams
458    } = State,
459    Streams2 = lists:foldl(
460        fun({PartId2, {_RequestId, _Mutation, Socket, _HiSeq}} = Entry, Acc) ->
461            case PartId of
462            nil ->
463                ok = gen_tcp:close(Socket),
464                Acc;
465            PartId2 ->
466                ok = gen_tcp:close(Socket),
467                Acc;
468            _ ->
469                [Entry | Acc]
470            end
471        end,
472    [], Streams),
473    State2 = State#state{streams = Streams2},
474    {reply, ok, State2};
475
476% Close the connection after the next received command
477handle_call(close_on_next, _from, State) ->
478    {reply, ok, State#state{close_on_next = true}}.
479
480-spec handle_cast(any(), #state{}) ->
481                         {stop, {unexpected_cast, any()}, #state{}}.
482handle_cast(Msg, State) ->
483    {stop, {unexpected_cast, Msg}, State}.
484
485-spec handle_info({'EXIT', {pid(), reference()}, normal} |
486                  send_mutations,
487                  #state{}) -> {noreply, #state{}}.
488handle_info({'EXIT', _From, normal}, State)  ->
489    {noreply, State};
490
491handle_info(send_mutations, State) ->
492    #state{
493       streams = Streams,
494       pause_mutations = Pause,
495       items_per_snapshot = ItemsPerSnapshot
496    } = State,
497    Streams2 = lists:foldl(fun
498        ({VBucketId, {RequestId, [Mutation | Rest], Socket, NumSent0}}, Acc) ->
499            {Cas, Seq, RevSeq, Flags, Expiration, LockTime, Key, Value} = Mutation,
500            NumSent = case ItemsPerSnapshot > 0 andalso
501                    NumSent0 =:= ItemsPerSnapshot of
502            true ->
503                NumItems = min(length(Rest) + 1, ItemsPerSnapshot),
504                Marker = couch_dcp_producer:encode_snapshot_marker(
505                    VBucketId, RequestId, Seq - 1, Seq + NumItems - 1,
506                    ?DCP_SNAPSHOT_TYPE_MEMORY),
507                ok = gen_tcp:send(Socket, Marker),
508                1;
509            false ->
510                NumSent0 + 1
511            end,
512            Encoded = case Value of
513            deleted ->
514                couch_dcp_producer:encode_snapshot_deletion(
515                VBucketId, RequestId, Cas, Seq, RevSeq, Key);
516            _ ->
517                couch_dcp_producer:encode_snapshot_mutation(
518                VBucketId, RequestId, Cas, Seq, RevSeq, Flags, Expiration,
519                LockTime, Key, Value)
520            end,
521            ok = gen_tcp:send(Socket, Encoded),
522            [{VBucketId, {RequestId, Rest, Socket, NumSent}} | Acc];
523        ({VBucketId, {RequestId, [], Socket, _NumSent}}, Acc) ->
524            StreamEnd = couch_dcp_producer:encode_stream_end(VBucketId, RequestId),
525            ok = gen_tcp:send(Socket, StreamEnd),
526            Acc
527        end, [], Streams),
528    case length(Streams2) of
529    0 ->
530        ok;
531    _ ->
532        case Pause of
533        false ->
534            self() ! send_mutations;
535        true ->
536            ok
537        end
538    end,
539    {noreply, State#state{streams = Streams2}}.
540
541
542-spec terminate(any(), #state{}) -> ok.
543terminate(_Reason, _State) ->
544    ok.
545
546-spec code_change(any(), #state{}, any()) -> {ok, #state{}}.
547code_change(_OldVsn, State, _Extra) ->
548    {ok, State}.
549
550
551% Internal functions
552
553-spec get_failover_log(partition_id()) -> partition_version().
554get_failover_log(PartId) ->
555    gen_server:call(?MODULE, {get_failover_log, PartId}).
556
557-spec get_failover_log(partition_id(), #state{}) -> partition_version().
558get_failover_log(PartId, State) ->
559    case dict:find(PartId, State#state.failover_logs) of
560    {ok, FailoverLog} ->
561        FailoverLog;
562    error ->
563        % Use a different partition UUID for every partition
564        [{1000 + PartId, 0}]
565    end.
566
567
568% Returns the current high sequence number of a partition
569-spec get_sequence_number(binary(), partition_id()) ->
570                                 {ok, update_seq()} |
571                                 {error, not_my_partition}.
572get_sequence_number(SetName, PartId) ->
573    ItemsPerSnapshot = gen_server:call(?MODULE, get_items_per_snapshot),
574    DupsPerSnapshot = gen_server:call(?MODULE, get_dups_per_snapshot),
575    get_sequence_number(SetName, PartId, ItemsPerSnapshot,
576        DupsPerSnapshot).
577
578-spec get_sequence_number(binary(), partition_id(), non_neg_integer(),
579                          non_neg_integer()) ->
580                                 {ok, update_seq()} |
581                                 {error, not_my_partition}.
582get_sequence_number(SetName, PartId, ItemsPerSnapshot,
583        DupsPerSnapshot) ->
584    case open_db(SetName, PartId) of
585    {ok, Db} ->
586        Seq0 = Db#db.update_seq,
587        couch_db:close(Db),
588        Seq = case DupsPerSnapshot > 0 of
589        true ->
590            num_items_with_dups(Seq0, ItemsPerSnapshot, DupsPerSnapshot);
591        false ->
592             Seq0
593        end,
594        {ok, Seq};
595    {error, cannot_open_db} ->
596        {error, not_my_partition}
597    end.
598
599
600% Returns the current number of items of a partition
601-spec get_num_items(binary(), partition_id()) -> {ok, non_neg_integer()} |
602                                                 {error, not_my_partition}.
603get_num_items(SetName, PartId) ->
604    case open_db(SetName, PartId) of
605    {ok, Db} ->
606        {ok, DbInfo} = couch_db:get_db_info(Db),
607        ok = couch_db:close(Db),
608        NumItems = couch_util:get_value(doc_count, DbInfo),
609        {ok, NumItems};
610    {error, cannot_open_db} ->
611        {error, not_my_partition}
612    end.
613
614
615-spec accept(socket()) -> pid().
616accept(Listen) ->
617    process_flag(trap_exit, true),
618    spawn_link(?MODULE, accept_loop, [Listen]).
619
620-spec accept_loop(socket()) -> ok.
621accept_loop(Listen) ->
622    {ok, Socket} = gen_tcp:accept(Listen),
623    % Let the server spawn a new process and replace this loop
624    % with the read loop, to avoid blocking
625    accept(Listen),
626    read(Socket).
627
628
629-spec read(socket()) -> ok.
630read(Socket) ->
631    case gen_tcp:recv(Socket, ?DCP_HEADER_LEN) of
632    {ok, Header} ->
633        case couch_dcp_producer:parse_header(Header) of
634        {open_connection, BodyLength, RequestId} ->
635            handle_open_connection_body(Socket, BodyLength, RequestId);
636        {stream_request, BodyLength, RequestId, PartId} ->
637            handle_stream_request_body(Socket, BodyLength, RequestId, PartId);
638        {failover_log, RequestId, PartId} ->
639            handle_failover_log(Socket, RequestId, PartId);
640        {stats, BodyLength, RequestId, PartId} ->
641            handle_stats_body(Socket, BodyLength, RequestId, PartId);
642        {sasl_auth, BodyLength, RequestId} ->
643            handle_sasl_auth_body(Socket, BodyLength, RequestId);
644        {stream_close, RequestId, PartId} ->
645            handle_stream_close_body(Socket, RequestId, PartId);
646        {select_bucket, BodyLength, RequestId} ->
647            handle_select_bucket_body(Socket, BodyLength, RequestId);
648        {control_request, BodyLength, RequestId} ->
649            handle_control_request(Socket, BodyLength, RequestId);
650        {buffer_ack, BodyLength, RequestId} ->
651            handle_buffer_ack_request(Socket, BodyLength, RequestId);
652        {all_seqs, RequestId} ->
653            handle_all_seqs(Socket, RequestId)
654        end,
655        read(Socket);
656    {error, closed} ->
657        ok
658    end.
659
660-spec handle_control_request(socket(), size(), request_id()) -> ok.
661handle_control_request(Socket, BodyLength, RequestId) ->
662    case gen_tcp:recv(Socket, BodyLength) of
663    {ok, <<"connection_buffer_size", Size/binary>>} ->
664        ok = gen_server:call(?MODULE, {handle_control_req, Size})
665    end,
666    ControlResponse = couch_dcp_producer:encode_control_flow_ok(RequestId),
667    ok = gen_tcp:send(Socket, ControlResponse).
668
669-spec handle_buffer_ack_request(socket(), size(), request_id()) -> ok .
670handle_buffer_ack_request(Socket, BodyLength, RequestId) ->
671    case gen_tcp:recv(Socket, BodyLength) of
672    {ok, <<Size:?DCP_SIZES_BUFFER_SIZE>>} ->
673        gen_server:call(?MODULE, {handle_buffer_ack, Size})
674    end,
675    BufferResponse = couch_dcp_producer:encode_buffer_ack_ok(RequestId),
676    ok = gen_tcp:send(Socket, BufferResponse).
677
678
679% XXX vmx: 2014-01-24: Proper logging/error handling is missing
680-spec handle_open_connection_body(socket(), size(), request_id()) ->
681                                         ok | {error, closed}.
682handle_open_connection_body(Socket, BodyLength, RequestId) ->
683    case gen_tcp:recv(Socket, BodyLength) of
684    {ok, <<_SeqNo:?DCP_SIZES_SEQNO,
685           _Flags:?DCP_SIZES_FLAGS,
686           _Name/binary>>} ->
687        OpenConnection = couch_dcp_producer:encode_open_connection(RequestId),
688        ok = gen_tcp:send(Socket, OpenConnection);
689    {error, closed} ->
690        {error, closed}
691    end.
692
693-spec handle_stream_request_body(socket(), size(), request_id(),
694                                 partition_id()) -> ok | {error, closed}.
695handle_stream_request_body(Socket, BodyLength, RequestId, PartId) ->
696    case gen_tcp:recv(Socket, BodyLength) of
697    % TODO vmx 2014-04-04: Make a rollback due to wrong SnapshotStart/End
698    {ok, <<Flags:?DCP_SIZES_FLAGS,
699           _Reserved:?DCP_SIZES_RESERVED,
700           StartSeq:?DCP_SIZES_BY_SEQ,
701           EndSeq:?DCP_SIZES_BY_SEQ,
702           PartUuid:?DCP_SIZES_PARTITION_UUID,
703           _SnapshotStart:?DCP_SIZES_BY_SEQ,
704           _SnapshotEnd:?DCP_SIZES_BY_SEQ>>} ->
705        FailoverLog = get_failover_log(PartId),
706        case StartSeq > EndSeq of
707        true ->
708            send_error(Socket, RequestId, ?DCP_STATUS_ERANGE);
709        false ->
710            EndSeq2 = case Flags of
711            ?DCP_FLAG_NOFLAG ->
712                EndSeq;
713            ?DCP_FLAG_USELATEST_ENDSEQNO ->
714                EndSeq;
715            Flags when (Flags band ?DCP_FLAG_DISKONLY) =/= 0 ->
716                % Either of the following flags:
717                % DCP_FLAG_DISKONLY
718                % (DCP_FLAG_DISKONLY bor DCP_FLAG_USELATEST_ENDSEQNO)
719                ItemsPerSnapshot = gen_server:call(
720                    ?MODULE, get_items_per_snapshot),
721                case ItemsPerSnapshot of
722                0 ->
723                    PersistedItemsFun = gen_server:call(
724                        ?MODULE, get_persisted_items_fun),
725                    PersistedItemsFun(EndSeq);
726                % The items per snapshot have higher priority than the
727                % persisted items function
728                _ ->
729                    ItemsPerSnapshot
730                end
731            end,
732            Found = case lists:keyfind(PartUuid, 1, FailoverLog) of
733            {PartUuid, PartVersionSeq} ->
734                true;
735            false ->
736                PartVersionSeq = 0,
737                false
738            end,
739            case Found orelse StartSeq =:= 0 of
740            true ->
741                send_ok_or_error(
742                    Socket, RequestId, PartId, StartSeq, EndSeq2, PartUuid,
743                    PartVersionSeq, FailoverLog);
744            false ->
745                send_error(Socket, RequestId, ?DCP_STATUS_KEY_NOT_FOUND)
746            end
747        end;
748    {error, closed} ->
749        {error, closed}
750    end.
751
752handle_stream_close_body(Socket, RequestId, PartId) ->
753    Status = case gen_server:call(?MODULE, {remove_stream, PartId}) of
754    ok ->
755        ?DCP_STATUS_OK;
756    vbucket_stream_not_found ->
757        ?DCP_STATUS_KEY_NOT_FOUND
758    end,
759    Resp = couch_dcp_producer:encode_stream_close_response(
760        RequestId, Status),
761    ok = gen_tcp:send(Socket, Resp).
762
763handle_select_bucket_body(Socket, BodyLength, RequestId) ->
764    {ok, _} = gen_tcp:recv(Socket, BodyLength),
765    Status = ?DCP_STATUS_OK,
766    Resp = couch_dcp_producer:encode_select_bucket_response(
767        RequestId, Status),
768    ok = gen_tcp:send(Socket, Resp).
769
770-spec send_ok_or_error(socket(), request_id(), partition_id(), update_seq(),
771                       update_seq(), uuid(), update_seq(),
772                       partition_version()) -> ok.
773send_ok_or_error(Socket, RequestId, PartId, StartSeq, EndSeq, PartUuid,
774        PartVersionSeq, FailoverLog) ->
775    SetName = gen_server:call(?MODULE, get_set_name),
776    {ok, HighSeq} = get_sequence_number(SetName, PartId),
777
778    case StartSeq =:= 0 of
779    true ->
780        send_ok(Socket, RequestId, PartId, StartSeq, EndSeq, FailoverLog);
781    false ->
782        % The server might already have a different future than the client
783        % has (the client and the server have a common history, but the server
784        % is ahead with new failover log entries). We need to make sure the
785        % requested `StartSeq` is lower than the sequence number of the
786        % failover log entry that comes next (if there is any).
787        DiffFailoverLog = lists:takewhile(fun({LogPartUuid, _}) ->
788            LogPartUuid =/= PartUuid
789        end, FailoverLog),
790
791        case DiffFailoverLog of
792        % Same history
793        [] ->
794            case StartSeq =< HighSeq of
795            true ->
796                send_ok(
797                    Socket, RequestId, PartId, StartSeq, EndSeq, FailoverLog);
798            false ->
799                % The client tries to get items from the future, which
800                % means that it got ahead of the server somehow.
801                send_error(Socket, RequestId, ?DCP_STATUS_ERANGE)
802            end;
803        _ ->
804            {_, NextHighSeqNum} = lists:last(DiffFailoverLog),
805            case StartSeq < NextHighSeqNum of
806            true ->
807                send_ok(
808                    Socket, RequestId, PartId, StartSeq, EndSeq, FailoverLog);
809            false ->
810                send_rollback(Socket, RequestId, PartVersionSeq)
811            end
812        end
813    end.
814
815-spec send_ok(socket(), request_id(), partition_id(), update_seq(),
816              update_seq(), partition_version()) -> ok.
817send_ok(Socket, RequestId, PartId, StartSeq, EndSeq, FailoverLog) ->
818        ok = gen_server:call(?MODULE, {add_stream, PartId, RequestId,
819                                       StartSeq, EndSeq, Socket, FailoverLog}).
820
821-spec send_rollback(socket(), request_id(), update_seq()) -> ok.
822send_rollback(Socket, RequestId, RollbackSeq) ->
823    StreamRollback = couch_dcp_producer:encode_stream_request_rollback(
824        RequestId, RollbackSeq),
825    ok = gen_tcp:send(Socket, StreamRollback).
826
827-spec send_error(socket(), request_id(), dcp_status()) -> ok.
828send_error(Socket, RequestId, Status) ->
829    StreamError = couch_dcp_producer:encode_stream_request_error(
830        RequestId, Status),
831    ok = gen_tcp:send(Socket, StreamError).
832
833
834-spec handle_failover_log(socket(), request_id(), partition_id()) -> ok.
835handle_failover_log(Socket, RequestId, PartId) ->
836    FailoverLog = get_failover_log(PartId),
837    FailoverLogResponse = couch_dcp_producer:encode_failover_log(
838        RequestId, FailoverLog),
839    ok = gen_tcp:send(Socket, FailoverLogResponse).
840
841
842-spec handle_stats_body(socket(), size(), request_id(), partition_id()) ->
843                               ok | not_yet_implemented |
844                               {error, closed | not_my_partition}.
845handle_stats_body(Socket, BodyLength, RequestId, PartId) ->
846    case gen_tcp:recv(Socket, BodyLength) of
847    {ok, Stat} ->
848        gen_server:call(?MODULE, {send_stat, Stat, Socket, RequestId, PartId});
849    {error, closed} ->
850        {error, closed}
851    end.
852
853handle_all_seqs(Socket, RequestId) ->
854    gen_server:call(?MODULE, {all_seqs, Socket, RequestId}).
855
856% XXX vmx: 2014-01-24: Proper logging/error handling is missing
857-spec handle_sasl_auth_body(socket(), size(), request_id()) ->
858                                   ok | {error, closed}.
859handle_sasl_auth_body(Socket, BodyLength, RequestId) ->
860    case gen_tcp:recv(Socket, BodyLength) of
861    % NOTE vmx 2014-01-10: Currently there's no real authentication
862    % implemented in the fake server. Just always send back the authentication
863    % was successful
864    {ok, _} ->
865        Authenticated = couch_dcp_producer:encode_sasl_auth(RequestId),
866        ok = gen_tcp:send(Socket, Authenticated);
867    {error, closed} ->
868        {error, closed}
869    end.
870
871
872% This function creates mutations for one snapshot of one partition of a
873% given size
874-spec create_mutations(binary(), partition_id(), update_seq(), update_seq()) ->
875                              [tuple()].
876create_mutations(SetName, PartId, StartSeq, EndSeq) ->
877    {ok, Db} = open_db(SetName, PartId),
878    DocsFun = fun(DocInfo, Acc) ->
879        #doc_info{
880            id = DocId,
881            deleted = Deleted,
882            local_seq = Seq,
883            rev = Rev
884        } = DocInfo,
885        Value = case Deleted of
886        true ->
887           deleted;
888        false ->
889            {ok, CouchDoc} = couch_db:open_doc_int(Db, DocInfo, []),
890            iolist_to_binary(CouchDoc#doc.body)
891        end,
892        {RevSeq, Cas, Expiration, Flags} = extract_revision(Rev),
893        {ok, [{Cas, Seq, RevSeq, Flags, Expiration, 0, DocId, Value}|Acc]}
894    end,
895    {ok, _NumDocs, Docs} = couch_db:fast_reads(Db, fun() ->
896        couch_db:enum_docs_since(Db, StartSeq, DocsFun, [],
897                                 [{end_key, EndSeq}])
898    end),
899    couch_db:close(Db),
900    lists:reverse(Docs).
901
902
903% Extract the CAS and flags out of thr revision
904% The couchdb unit tests don't fill in a proper revision, but an empty binary
905-spec extract_revision({non_neg_integer(), <<_:128>>}) ->
906                              {non_neg_integer(), non_neg_integer(),
907                               non_neg_integer(), non_neg_integer()}.
908extract_revision({RevSeq, <<>>}) ->
909    {RevSeq, 0, 0, 0};
910% https://github.com/couchbase/ep-engine/blob/master/src/couch-kvstore/couch-kvstore.cc#L212-L216
911extract_revision({RevSeq, RevMeta}) ->
912    <<Cas:64, Expiration:32, Flags:32>> = RevMeta,
913    {RevSeq, Cas, Expiration, Flags}.
914
915
916-spec open_db(binary(), partition_id()) ->
917                     {ok, #db{}} | {error, cannot_open_db}.
918open_db(SetName, PartId) ->
919    case couch_db:open_int(?dbname(SetName, PartId), []) of
920    {ok, PartDb} ->
921        {ok, PartDb};
922    _Error ->
923        {error, cannot_open_db}
924    end.
925
926
927-spec open_master_db(binary()) ->
928                     {ok, #db{}} | {error, cannot_open_db}.
929open_master_db(SetName) ->
930    case couch_db:open_int(?master_dbname(SetName), []) of
931    {ok, PartDb} ->
932        {ok, PartDb};
933    _Error ->
934        {error, cannot_open_db}
935    end.
936
937
938-spec ceil_div(non_neg_integer(), pos_integer()) -> non_neg_integer().
939ceil_div(Numerator, Denominator) ->
940    (Numerator div Denominator) + min(Numerator rem Denominator, 1).
941
942
943-spec create_mutations_dups([tuple()], pos_integer(), pos_integer(),
944                            non_neg_integer()) -> [tuple()].
945create_mutations_dups(_Mutations, ItemsPerSnapshot, DupsPerSnapshot,
946        _InitCount) when DupsPerSnapshot >= (ItemsPerSnapshot div 2) ->
947    % Else the algorithm for creating duplicates doesn't work properly
948    throw({error, <<"The number of duplicates must be lower than half of "
949        "the items per snapshot">>});
950create_mutations_dups(Mutations, ItemsPerSnapshot, DupsPerSnapshot,
951        InitCount) ->
952    % Make sure every test run leads to the same result
953    random:seed(5, 6, 7),
954    {Mutations2, _} = lists:foldl(fun(Mutation, {Acc, I}) ->
955        case I > 0 andalso (I rem ItemsPerSnapshot) =:= 0 of
956        true ->
957            Shuffled = [X || {_, X} <- lists:sort(
958                [{random:uniform(), M} || M <- lists:usort(Acc)])],
959            RandomMutations = lists:sublist(Shuffled, DupsPerSnapshot),
960            {Acc ++ RandomMutations ++ [Mutation], I + 1 + DupsPerSnapshot};
961        false ->
962            {Acc ++ [Mutation], I + 1}
963        end
964    end, {[], InitCount}, Mutations),
965    Mutations2.
966
967
968% Apply sequentially increasing sequence numbers to the mutations
969-spec apply_sequence_numbers([tuple()]) -> [tuple()].
970apply_sequence_numbers(Mutations) ->
971    StartSeq = element(2, hd(Mutations)),
972    {Mutations2, _} = lists:mapfoldl(fun(M, I) ->
973        {setelement(2, M, I), I + 1}
974    end, StartSeq, Mutations),
975    Mutations2.
976
977
978% When a certain amount of items is added per snapshot, it can happen that
979% the number of items overflow into a new snapshot which can keep cascading
980-spec num_items_with_dups(pos_integer(), pos_integer(), pos_integer()) ->
981                                 pos_integer().
982num_items_with_dups(NumItems, ItemsPerSnapshot, DupsPerSnapshot) ->
983    NumSnapshots = couch_dcp_fake_server:ceil_div(NumItems, ItemsPerSnapshot),
984    % The first snapshot doesn't contain duplicates hence "- 1"
985    num_items_with_dups(
986        NumItems + (NumSnapshots - 1) * DupsPerSnapshot,
987        ItemsPerSnapshot, DupsPerSnapshot, NumSnapshots).
988
989-spec num_items_with_dups(pos_integer(), pos_integer(), pos_integer(),
990                          pos_integer()) -> pos_integer().
991num_items_with_dups(CurrentNum, ItemsPerSnapshot, DupsPerSnapshot,
992        NumSnapshots) ->
993    NewNumSnapshots = couch_dcp_fake_server:ceil_div(
994        CurrentNum, ItemsPerSnapshot),
995    case NewNumSnapshots =:= NumSnapshots of
996    true ->
997        CurrentNum;
998    false ->
999        NewNum = CurrentNum +
1000            (NewNumSnapshots - NumSnapshots) * DupsPerSnapshot,
1001        num_items_with_dups(
1002            NewNum, ItemsPerSnapshot, DupsPerSnapshot, NewNumSnapshots)
1003    end.
1004
1005-spec list_partitions(binary()) -> [partition_id()].
1006list_partitions(SetName) ->
1007    {ok, MasterDb} = open_master_db(SetName),
1008    DbDir = filename:dirname(MasterDb#db.filepath),
1009    ok = couch_db:close(MasterDb),
1010    FilePaths = filelib:wildcard(filename:join([DbDir, "*.couch.[0-9]*"])),
1011    Parts = lists:filtermap(fun(Path) ->
1012        File = filename:basename(Path),
1013        case string:tokens(File, ".") of
1014        ["master", "couch", _] ->
1015            false;
1016        [ListPartId, "couch", _] ->
1017            {true, list_to_integer(ListPartId)}
1018        end
1019    end, FilePaths),
1020    lists:sort(Parts).
1021
1022-spec send_vbucket_seqnos_stats(#state{}, binary(),
1023        socket(), request_id(), [partition_id()]) -> ok.
1024send_vbucket_seqnos_stats(State, SetName, Socket, RequestId, Partitions) ->
1025    #state{
1026        setname = SetName,
1027        items_per_snapshot = ItemsPerSnapshot,
1028        dups_per_snapshot = DupsPerSnapshot
1029    } = State,
1030    Result = lists:map(fun(PartId) ->
1031        BinPartId = list_to_binary(integer_to_list(PartId)),
1032        case get_sequence_number(SetName, PartId, ItemsPerSnapshot,
1033            DupsPerSnapshot) of
1034        {ok, Seq} ->
1035            SeqKey = <<"vb_", BinPartId/binary ,":high_seqno">>,
1036            SeqValue = list_to_binary(integer_to_list(Seq)),
1037            SeqStat = couch_dcp_producer:encode_stat(
1038                RequestId, SeqKey, SeqValue),
1039            ok = gen_tcp:send(Socket, SeqStat),
1040
1041            UuidKey = <<"vb_", BinPartId/binary ,":vb_uuid">>,
1042            FailoverLog = get_failover_log(PartId, State),
1043            {UuidValue, _} = hd(FailoverLog),
1044            UuidStat = couch_dcp_producer:encode_stat(
1045                RequestId, UuidKey, <<UuidValue:64/integer>>),
1046            ok = gen_tcp:send(Socket, UuidStat),
1047            true;
1048        {error, not_my_partition} ->
1049            % TODO sarath 2014-07-15: Fix get_stats API for single partition
1050            % The real response contains the vBucket map so that
1051            % clients can adapt. It's not easy to simulate, hence
1052            % we return an empty JSON object to keep things simple.
1053            %StatError = couch_dcp_producer:encode_stat_error(
1054            %    RequestId, ?DCP_STATUS_NOT_MY_VBUCKET,
1055            %    <<"{}">>),
1056            %ok = gen_tcp:send(Socket, StatError),
1057            true
1058        end
1059    end, Partitions),
1060    case lists:all(fun(E) -> E end, Result) of
1061    true ->
1062        EndStat = couch_dcp_producer:encode_stat(RequestId, <<>>, <<>>),
1063        ok = gen_tcp:send(Socket, EndStat);
1064    false ->
1065        ok
1066    end.
1067