1#!/usr/bin/env escript
2%% -*- erlang -*-
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_dcp/include/couch_dcp.hrl").
18
19test_set_name() -> <<"couch_test_couch_dcp_client">>.
20num_set_partitions() -> 4.
21num_docs() -> 1000.
22num_docs_pp() -> num_docs() div num_set_partitions().
23
24-define(DCP_MSG_SIZE_MUTATION, 55).
25-define(DCP_MSG_SIZE_DELETION, 42).
26-define(DCP_MSG_SIZE_SNAPSHOT , 44).
27-define(DCP_MSG_SIZE_STREAM_END, 28).
28
29main(_) ->
30    test_util:init_code_path(),
31
32    etap:plan(56),
33    case (catch test()) of
34        ok ->
35            etap:end_tests();
36        Other ->
37            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
38            etap:bail(Other)
39    end,
40    %init:stop(),
41    %receive after infinity -> ok end,
42    ok.
43
44
45test() ->
46    couch_set_view_test_util:start_server(test_set_name()),
47    setup_test(),
48
49    tests(),
50    test_close_during_request(),
51
52    couch_set_view_test_util:stop_server(),
53    ok.
54
55tests() ->
56    % Populate failover log
57    FailoverLogs = lists:map(fun(PartId) ->
58        FailoverLog = [
59            {10001, PartId + 3}, {10002, PartId + 2}, {10003, 0}],
60        couch_dcp_fake_server:set_failover_log(PartId, FailoverLog),
61        FailoverLog
62    end, lists:seq(0, num_set_partitions() - 1)),
63
64    TestFun = fun(Item, Acc) ->
65        case Item of
66        {snapshot_marker, _} ->
67            Acc;
68        {part_versions, _} ->
69            Acc;
70        _ ->
71            Acc ++ [Item]
72        end
73    end,
74
75    {auth, User, Passwd} = cb_auth_info:get(),
76    {ok, Pid} = couch_dcp_client:start(
77        test_set_name(), test_set_name(), User, Passwd, 1024, 0),
78
79    % Get the latest partition version first
80    {ok, InitialFailoverLog0} = couch_dcp_client:get_failover_log(Pid, 0),
81    etap:is(InitialFailoverLog0, hd(FailoverLogs), "Failover log is correct"),
82
83    % First parameter is the partition, the second is the sequence number
84    % to start at.
85    {ok, Docs1, FailoverLog1} = couch_dcp_client:enum_docs_since(
86        Pid, 0, InitialFailoverLog0, 4, 10, ?DCP_FLAG_NOFLAG, TestFun, []),
87    etap:is(length(Docs1), 6, "Correct number of docs (6) in partition 0"),
88    etap:is(FailoverLog1, lists:nth(1, FailoverLogs),
89        "Failoverlog from partition 0 is correct"),
90
91    {ok, InitialFailoverLog1} = couch_dcp_client:get_failover_log(Pid, 1),
92    {ok, Docs2, FailoverLog2} = couch_dcp_client:enum_docs_since(
93        Pid, 1, InitialFailoverLog1, 46, 165, ?DCP_FLAG_NOFLAG, TestFun, []),
94    etap:is(length(Docs2), 119, "Correct number of docs (109) partition 1"),
95    etap:is(FailoverLog2, lists:nth(2, FailoverLogs),
96        "Failoverlog from partition 1 is correct"),
97
98    {ok, InitialFailoverLog2} = couch_dcp_client:get_failover_log(Pid, 2),
99    {ok, Docs3, FailoverLog3} = couch_dcp_client:enum_docs_since(
100        Pid, 2, InitialFailoverLog2, 80, num_docs() div num_set_partitions(),
101        ?DCP_FLAG_NOFLAG, TestFun, []),
102    Expected3 = (num_docs() div num_set_partitions()) - 80,
103    etap:is(length(Docs3), Expected3,
104        io_lib:format("Correct number of docs (~p) partition 2", [Expected3])),
105    etap:is(FailoverLog3, lists:nth(3, FailoverLogs),
106        "Failoverlog from partition 2 is correct"),
107
108    {ok, InitialFailoverLog3} = couch_dcp_client:get_failover_log(Pid, 3),
109    {ok, Docs4, FailoverLog4} = couch_dcp_client:enum_docs_since(
110        Pid, 3, InitialFailoverLog3, 0, 5, ?DCP_FLAG_NOFLAG, TestFun, []),
111    etap:is(length(Docs4), 5, "Correct number of docs (5) partition 3"),
112    etap:is(FailoverLog4, lists:nth(4, FailoverLogs),
113        "Failoverlog from partition 3 is correct"),
114
115    % Try a too high sequence number to get a erange error response
116    {error, ErangeError} = couch_dcp_client:enum_docs_since(
117        Pid, 0, InitialFailoverLog0, 400, 450, ?DCP_FLAG_NOFLAG, TestFun, []),
118    etap:is(ErangeError, wrong_start_sequence_number,
119        "Correct error message for too high sequence number"),
120
121    % Start sequence is bigger than end sequence
122    {_Request, ErangeError2} =
123        couch_dcp_client:add_stream(
124            Pid, 0, first_uuid(InitialFailoverLog0), 5, 2,
125            ?DCP_FLAG_NOFLAG),
126    etap:is(ErangeError2, {error, wrong_start_sequence_number},
127        "Correct error message for start sequence > end sequence"),
128
129    Error = couch_dcp_client:enum_docs_since(
130        Pid, 1, [{4455667788, 1243}], 46, 165, ?DCP_FLAG_NOFLAG, TestFun, []),
131    etap:is(Error, {rollback, 0},
132        "Correct error for wrong failover log"),
133
134    {ok, [Seq0]} = couch_dcp_client:get_seqs(Pid, [0]),
135    etap:is(Seq0, {0, num_docs() div num_set_partitions()},
136        "Sequence number of partition 0 is correct"),
137    {ok, [Seq1]} = couch_dcp_client:get_seqs(Pid, [1]),
138    etap:is(Seq1, {1, num_docs() div num_set_partitions()},
139        "Sequence number of partition 1 is correct"),
140    {ok, [Seq2]} = couch_dcp_client:get_seqs(Pid, [2]),
141    etap:is(Seq2, {2, num_docs() div num_set_partitions()},
142         "Sequence number of partition 2 is correct"),
143    {ok, [Seq3]} = couch_dcp_client:get_seqs(Pid, [3]),
144    etap:is(Seq3, {3, num_docs() div num_set_partitions()},
145        "Sequence number of partition 3 is correct"),
146    SeqMissing = couch_dcp_client:get_seqs(Pid, [100000]),
147    etap:is(SeqMissing, {ok, []},
148        "Too high partition number returns correct error"),
149    SeqAll = couch_dcp_client:get_seqs(Pid, nil),
150    etap:is(SeqAll, {ok, [Seq0, Seq1, Seq2, Seq3]},
151        "Returned all partition seqs correctly"),
152
153
154    % Test snapshot markers types
155
156    TestSnapshotFun = fun(Item, Acc) ->
157        case Item of
158        {snapshot_marker, Marker} ->
159            Acc ++ [Marker];
160        {part_versions, _} ->
161            Acc;
162        _ ->
163            Acc
164        end
165    end,
166
167    SnapshotStart1 = 0,
168    SnapshotEnd1 = num_docs() div (num_set_partitions() * 2),
169    {ok, Markers1, SnapshotFailoverLog1} = couch_dcp_client:enum_docs_since(
170        Pid, 2, [{0, 0}], SnapshotStart1, SnapshotEnd1, ?DCP_FLAG_NOFLAG,
171        TestSnapshotFun, []),
172    ExpectedMarkers1 = [{SnapshotStart1, SnapshotEnd1,
173        ?DCP_SNAPSHOT_TYPE_DISK}],
174    etap:is(Markers1, ExpectedMarkers1,
175        "Received one on-disk snapshot marker"),
176
177    SnapshotStart2 = num_docs() div (num_set_partitions() * 2),
178    SnapshotEnd2 = num_docs() div num_set_partitions(),
179    {ok, Markers2, SnapshotFailoverLog2} = couch_dcp_client:enum_docs_since(
180        Pid, 2, SnapshotFailoverLog1, SnapshotStart2, SnapshotEnd2,
181        ?DCP_FLAG_NOFLAG, TestSnapshotFun, []),
182    ExpectedMarkers2 = [{SnapshotStart2, SnapshotEnd2,
183        ?DCP_SNAPSHOT_TYPE_MEMORY}],
184    etap:is(Markers2, ExpectedMarkers2,
185        "Received one in-memory snapshot marker"),
186
187
188    % Test multiple snapshots
189
190    TestAllFun = fun(Item, Acc) -> Acc ++ [Item] end,
191
192    ItemsPerSnapshot = 30,
193    couch_dcp_fake_server:set_items_per_snapshot(ItemsPerSnapshot),
194    SnapshotStart3 = 0,
195    SnapshotEnd3 = num_docs() div num_set_partitions(),
196    {ok, All3, SnapshotFailoverLog3} = couch_dcp_client:enum_docs_since(
197        Pid, 2, [{0, 0}], SnapshotStart3, SnapshotEnd3, ?DCP_FLAG_NOFLAG,
198        TestAllFun, []),
199    Markers3 = [M || {snapshot_marker, M} <- All3],
200    ExpectedMarkers3 = [{0, ItemsPerSnapshot, ?DCP_SNAPSHOT_TYPE_DISK}] ++
201        lists:map(fun(I) ->
202            {I, min(I + ItemsPerSnapshot, SnapshotEnd3),
203                ?DCP_SNAPSHOT_TYPE_MEMORY}
204        end, lists:seq(ItemsPerSnapshot, SnapshotEnd3 - 1, ItemsPerSnapshot)),
205    etap:is(Markers3, ExpectedMarkers3,
206        "Received the expected snapshot markers"),
207
208    Mutations3 = [M || #dcp_doc{} = M <- All3],
209    couch_dcp_fake_server:set_items_per_snapshot(0),
210    {ok, ExpectedMutations3, SnapshotFailoverLog3} =
211            couch_dcp_client:enum_docs_since(
212        Pid, 2, [{0, 0}], SnapshotStart3, SnapshotEnd3, ?DCP_FLAG_NOFLAG,
213        TestFun, []),
214    etap:is(Mutations3, ExpectedMutations3,
215        "Received the expected mutations within the several snapshots"),
216
217
218    % Test duplicated items in multiple snapshots
219
220    ItemsPerSnapshot2 = 30,
221    DupsPerSnapshot2 = 4,
222    couch_dcp_fake_server:set_items_per_snapshot(ItemsPerSnapshot2),
223    couch_dcp_fake_server:set_dups_per_snapshot(DupsPerSnapshot2),
224    DupsStart2 = 0,
225    DupsEnd2 = couch_dcp_fake_server:num_items_with_dups(
226        num_docs_pp(), ItemsPerSnapshot2, DupsPerSnapshot2),
227
228    {ok, AllDups2, DupsFailoverLog2} = couch_dcp_client:enum_docs_since(
229        Pid, 2, [{0, 0}], DupsStart2, DupsEnd2, ?DCP_FLAG_NOFLAG,
230        TestAllFun, []),
231    DupsMutations2 = [M || #dcp_doc{} = M <- AllDups2],
232    DupsMarkers2 = [M || {snapshot_marker, M} <- AllDups2],
233    etap:is(length(DupsMutations2), DupsEnd2,
234        "received the expected number of mutations (incl. duplicates)"),
235    etap:is(length(DupsMarkers2),
236        couch_dcp_fake_server:ceil_div(DupsEnd2, ItemsPerSnapshot2),
237        "received the expected number of snapshots"),
238    DupsUnique2 = lists:ukeysort(#dcp_doc.id, DupsMutations2),
239    DupsUniqueIds2 = [Id || #dcp_doc{id = Id} <- DupsUnique2],
240    MutationsIds2 = [Id || #dcp_doc{id = Id} <- Mutations3],
241    etap:is(DupsUniqueIds2, MutationsIds2,
242        "received the expected mutations when de-duplicated"),
243    couch_dcp_fake_server:set_items_per_snapshot(0),
244    couch_dcp_fake_server:set_dups_per_snapshot(0),
245
246
247    % Test multiple streams in parallel
248    {StreamReq0, {failoverlog, InitialFailoverLog0}} =
249        couch_dcp_client:add_stream(
250            Pid, 0, first_uuid(InitialFailoverLog0), 10, 100,
251            ?DCP_FLAG_NOFLAG),
252
253    {StreamReq1, {failoverlog, InitialFailoverLog1}} =
254        couch_dcp_client:add_stream(
255            Pid, 1, first_uuid(InitialFailoverLog1), 100, 200,
256            ?DCP_FLAG_NOFLAG),
257
258    {StreamReq2, {failoverlog, InitialFailoverLog2}} =
259        couch_dcp_client:add_stream(
260            Pid, 2, first_uuid(InitialFailoverLog2), 0, 10, ?DCP_FLAG_NOFLAG),
261
262    [MutationsPart0, MutationsPart1, MutationsPart2] = read_mutations(
263                    Pid, [StreamReq0, StreamReq1, StreamReq2], [[], [], []]),
264
265
266    etap:is(is_same_partition(0, MutationsPart0), true,
267        "Stream0 has only partition0 mutations"),
268    etap:is(is_same_partition(1, MutationsPart1), true,
269        "Stream1 has only partition1 mutations"),
270    etap:is(is_same_partition(2, MutationsPart2), true,
271        "Stream2 has only partition2 mutations"),
272
273    etap:is(length(MutationsPart0), 90,
274        "Stream0 has 90 mutations"),
275    etap:is(length(MutationsPart1), 100,
276        "Stream1 has 100 mutations"),
277    etap:is(length(MutationsPart2), 10,
278        "Stream2 has 10 mutations"),
279
280    StreamList0 = couch_dcp_client:list_streams(Pid),
281    etap:is(StreamList0, [], "Stream list is empty"),
282
283    couch_dcp_fake_server:pause_mutations(),
284    {StreamReq0_2, {failoverlog, InitialFailoverLog0}} =
285        couch_dcp_client:add_stream(
286            Pid, 0, first_uuid(InitialFailoverLog0), 1, 100, ?DCP_FLAG_NOFLAG),
287
288    {_, StreamResp0_3} =
289        couch_dcp_client:add_stream(
290            Pid, 0, first_uuid(InitialFailoverLog0), 10, 100,
291            ?DCP_FLAG_NOFLAG),
292    etap:is(StreamResp0_3, {error,vbucket_stream_already_exists},
293        "Stream for vbucket 0 already exists"),
294    couch_dcp_fake_server:continue_mutations(),
295
296    % Drain all mutations
297    read_mutations(Pid, [StreamReq0_2], [[]]),
298
299    couch_dcp_fake_server:pause_mutations(),
300    couch_dcp_client:add_stream(
301        Pid, 1, first_uuid(InitialFailoverLog1), 10, 300, ?DCP_FLAG_NOFLAG),
302
303    couch_dcp_client:add_stream(
304        Pid, 2, first_uuid(InitialFailoverLog2), 100, 200, ?DCP_FLAG_NOFLAG),
305
306    StreamList1 = couch_dcp_client:list_streams(Pid),
307    etap:is(StreamList1, [1,2], "Stream list contains parititon 1,2"),
308
309    StreamRemoveResp0 = couch_dcp_client:remove_stream(Pid, 1),
310    StreamList2 = couch_dcp_client:list_streams(Pid),
311    etap:is({StreamRemoveResp0, StreamList2}, {ok, [2]},
312        "Removed parititon stream 1 and parition stream 2 is left"),
313
314    StreamRemoveResp1 = couch_dcp_client:remove_stream(Pid, 1),
315    etap:is(StreamRemoveResp1, {error, vbucket_stream_not_found},
316        "Correct error on trying to remove non-existing stream"),
317    couch_dcp_fake_server:continue_mutations(),
318
319    % Test with too large failover log
320    TooLargeFailoverLog = [{I, I} ||
321        I <- lists:seq(0, ?DCP_MAX_FAILOVER_LOG_SIZE)],
322    PartId = 1,
323    couch_dcp_fake_server:set_failover_log(PartId, TooLargeFailoverLog),
324    TooLargeError = couch_dcp_client:enum_docs_since(
325          Pid, PartId, [{0, 0}], 0, 100, ?DCP_FLAG_NOFLAG, TestFun, []),
326    etap:is(TooLargeError, {error, too_large_failover_log},
327        "Too large failover log returns correct error"),
328
329    gen_server:call(Pid, reset_buffer_size),
330    % Remove existing streams
331    couch_dcp_client:remove_stream(Pid, 0),
332    couch_dcp_client:remove_stream(Pid, 1),
333    couch_dcp_client:remove_stream(Pid, 2),
334
335
336    % Tests for flow control
337    couch_dcp_fake_server:pause_mutations(),
338
339    {StreamReq0_4, _} = couch_dcp_client:add_stream(
340        Pid, 0, first_uuid(InitialFailoverLog0), 0, 500, ?DCP_FLAG_NOFLAG),
341
342    gen_server:call(Pid, reset_buffer_size),
343    Ret = couch_dcp_fake_server:is_control_req(),
344
345    etap:is(Ret, true, "Buffer control request sent"),
346
347    NumBufferAck = couch_dcp_fake_server:get_num_buffer_acks(),
348
349    % Fill the client buffer by asking server to send 1024 bytes data
350    try_until_throttled(Pid, StreamReq0_4, 1000, 1024),
351
352    % Consume 200 bytes from the client
353    try_until_unthrottled(Pid, StreamReq0_4, 0, 200),
354
355    NumBufferAck2 = couch_dcp_fake_server:get_num_buffer_acks(),
356    etap:is(NumBufferAck2, NumBufferAck, "Not sent the buffer ack"),
357
358    % Consume More data so that is greater then 20 % of 1024 i.e.204.
359    % when data is 20% consumed, client sends the buffer ack to increase
360    % the flow control buffer.
361    try_until_unthrottled(Pid, StreamReq0_4, 0, 200),
362    timer:sleep(2),
363    NumBufferAck3 = couch_dcp_fake_server:get_num_buffer_acks(),
364    etap:is(NumBufferAck3, NumBufferAck + 1, "Got the buffer ack"),
365    couch_dcp_client:remove_stream(Pid, 0),
366
367    couch_dcp_fake_server:pause_mutations(),
368    {StreamReq1_2, _} = couch_dcp_client:add_stream(
369        Pid, 1, first_uuid(InitialFailoverLog1), 0, 500, ?DCP_FLAG_NOFLAG),
370
371    ReqPid = spawn(fun() ->
372        couch_dcp_client:get_stream_event(Pid, StreamReq1_2)
373        end),
374
375    EvResponse = couch_dcp_client:get_stream_event(Pid, StreamReq1_2),
376    etap:is(EvResponse, {error, event_request_already_exists},
377        "Error message received when requested multiple times for same stream's event"),
378
379    exit(ReqPid, shutdown),
380    couch_dcp_client:remove_stream(Pid, 1),
381    gen_server:call(Pid, flush_old_streams_meta),
382
383    {StreamReq0_5, _} = couch_dcp_client:add_stream(Pid, 2,
384        first_uuid(InitialFailoverLog2), 0, 500, ?DCP_FLAG_NOFLAG),
385
386    % Connection close and reconnection tests
387    ok = couch_dcp_fake_server:send_single_mutation(),
388    {snapshot_mutation, Mutation1} = couch_dcp_client:get_stream_event(Pid, StreamReq0_5),
389    #dcp_doc{seq = SeqNo1} = Mutation1,
390    OldSocket = gen_server:call(Pid, get_socket),
391    ok = couch_dcp_fake_server:close_connection(2),
392    % wait for sometime to do reconnect from couch_dcp client
393    timer:sleep(4000),
394    NewSocket = gen_server:call(Pid, get_socket),
395    etap:isnt(OldSocket, NewSocket, "Socket changed"),
396    ok = couch_dcp_fake_server:send_single_mutation(),
397    ErrorResp = couch_dcp_client:get_stream_event(Pid, StreamReq0_5),
398    etap:is({error, dcp_conn_closed}, ErrorResp, "Got the error response after connection close"),
399    EmptyStreamList = couch_dcp_client:list_streams(Pid),
400    etap:is([], EmptyStreamList, "Stream is correctly removed after connection close"),
401
402    ok = couch_dcp_fake_server:close_connection(nil),
403    % wait for sometime to do reconnect from couch_dcp client
404    timer:sleep(4000),
405    couch_dcp_client:remove_stream(Pid, 2),
406    couch_dcp_fake_server:continue_mutations(),
407
408    % Test get_num_items
409
410    {ok, NumItems0} = couch_dcp_client:get_num_items(Pid, 0),
411    etap:is(NumItems0, num_docs() div num_set_partitions(),
412        "Number of items of partition 0 is correct"),
413    {ok, NumItems1} = couch_dcp_client:get_num_items(Pid, 1),
414    etap:is(NumItems1, num_docs() div num_set_partitions(),
415        "Number of items of partition 1 is correct"),
416    {ok, NumItems2} = couch_dcp_client:get_num_items(Pid, 2),
417    etap:is(NumItems2, num_docs() div num_set_partitions(),
418         "Number of items of partition 2 is correct"),
419    {ok, NumItems3} = couch_dcp_client:get_num_items(Pid, 3),
420    etap:is(NumItems3, num_docs() div num_set_partitions(),
421        "Number of items of partition 3 is correct"),
422    NumItemsError = couch_dcp_client:get_num_items(Pid, 100000),
423    etap:is(NumItemsError, {error, not_my_vbucket},
424        "Too high partition number returns correct error"),
425
426    DocsPerPartition = num_docs() div num_set_partitions(),
427    NumDelDocs0 = num_docs() div (num_set_partitions() * 2),
428    delete_docs(1, NumDelDocs0),
429    {ok, NumItemsDel0} = couch_dcp_client:get_num_items(Pid, 0),
430    etap:is(NumItemsDel0, DocsPerPartition - NumItemsDel0,
431        "Number of items of partition 0 after some deletions is correct"),
432    NumDelDocs3 = num_docs() div (num_set_partitions() * 4),
433    delete_docs(3 * DocsPerPartition + 1, NumDelDocs3),
434    {ok, NumItemsDel3} = couch_dcp_client:get_num_items(Pid, 3),
435    etap:is(NumItemsDel3, DocsPerPartition - NumDelDocs3,
436        "Number of items of partition 3 after some deletions is correct"),
437
438    % Tests for requesting persisted items only
439
440    couch_dcp_fake_server:set_persisted_items_fun(fun(Seq) -> Seq  end),
441    {ok, [{1, HighSeq1}]} = couch_dcp_client:get_seqs(Pid, [1]),
442    {ok, ExpectedDocs1, _} =
443        couch_dcp_client:enum_docs_since(
444            Pid, 0, InitialFailoverLog0, 0, HighSeq1, ?DCP_FLAG_NOFLAG,
445            TestFun, []),
446    {ok, PersistedDocs1, _} =
447        couch_dcp_client:enum_docs_since(
448            Pid, 0, InitialFailoverLog0, 0, HighSeq1, ?DCP_FLAG_DISKONLY,
449            TestFun, []),
450    etap:is(PersistedDocs1, ExpectedDocs1,
451        "The persisted sequence number is correct, seq"),
452
453    couch_dcp_fake_server:set_persisted_items_fun(
454       fun(Seq) -> Seq div 2 end),
455    {ok, [{1, HighSeq2}]} = couch_dcp_client:get_seqs(Pid, [1]),
456    {ok, ExpectedDocs2, _} =
457        couch_dcp_client:enum_docs_since(
458            Pid, 0, InitialFailoverLog0, 0, HighSeq2 div 2, ?DCP_FLAG_NOFLAG,
459            TestFun, []),
460    {ok, PersistedDocs2, _} =
461        couch_dcp_client:enum_docs_since(
462            Pid, 0, InitialFailoverLog0, 0, HighSeq2, ?DCP_FLAG_DISKONLY,
463            TestFun, []),
464    etap:is(PersistedDocs2, ExpectedDocs2,
465        "The persisted sequence number is correct, seq/2"),
466
467    couch_dcp_fake_server:set_persisted_items_fun(fun(Seq) -> Seq - 1 end),
468    {ok, [{1, HighSeq3}]} = couch_dcp_client:get_seqs(Pid, [1]),
469    {ok, ExpectedDocs3, _} =
470        couch_dcp_client:enum_docs_since(
471            Pid, 0, InitialFailoverLog0, 0, HighSeq3 - 1, ?DCP_FLAG_NOFLAG,
472            TestFun, []),
473    {ok, PersistedDocs3, _} =
474        couch_dcp_client:enum_docs_since(
475            Pid, 0, InitialFailoverLog0, 0, HighSeq3, ?DCP_FLAG_DISKONLY,
476            TestFun, []),
477    etap:is(PersistedDocs3, ExpectedDocs3,
478        "The persisted sequence number is correct, seq - 1"),
479    ok.
480
481% Test robustness when the connection is dropped during a get all sequence
482% numbers or stats request
483% This is a regression test for MB-15922 and MB-17026
484test_close_during_request() ->
485    %timer:sleep(1),
486    %couch_set_view_test_util:start_server(test_set_name()),
487    %setup_test(),
488
489    {auth, User, Passwd} = cb_auth_info:get(),
490    {ok, Pid} = couch_dcp_client:start(
491        test_set_name(), test_set_name(), User, Passwd, 1024, 0),
492
493    ParentPid = self(),
494
495    % Get sequencenumbers request
496    spawn(fun() ->
497        couch_dcp_fake_server:close_on_next(),
498        Closed = couch_dcp_client:get_seqs(Pid, [0]),
499        ParentPid ! {ok, Closed}
500    end),
501    receive
502    {ok, Closed} ->
503        etap:is(Closed, {error, dcp_conn_closed},
504            "The connection got (as expected) closed "
505            "during the get sequence numbers request")
506    after 10000 ->
507         etap:bail("Cannot get sequence number on time, the DCP client hangs")
508    end,
509
510    % Stats request
511    spawn(fun() ->
512        couch_dcp_fake_server:close_on_next(),
513        catch couch_dcp_client:get_num_items(Pid, 0),
514        ParentPid ! ok
515    end),
516    receive
517    ok ->
518        etap:ok(true,
519            "The connection got (as expected) closed during the stats request")
520    after 10000 ->
521         etap:bail("Cannot get stats on time, the DCP client hangs")
522    end,
523    ok.
524
525
526try_until_throttled(Pid, ReqId, N, MaxSize) when N > 0 ->
527    ok = couch_dcp_fake_server:send_single_mutation(),
528    timer:sleep(1),
529    Size2 = gen_server:call(Pid, {get_buffer_size, ReqId}),
530    if 
531    MaxSize > Size2 ->
532        try_until_throttled(Pid, ReqId, N - 1, MaxSize);
533    true ->
534        ok
535    end.
536
537try_until_unthrottled(Pid, ReqId, Size, MaxSize) ->
538    Data = couch_dcp_client:get_stream_event(Pid, ReqId),
539    Size2 = Size + get_event_size(Data), 
540    if
541    Size2 < MaxSize ->
542        try_until_unthrottled(Pid, ReqId, Size2, MaxSize);
543    true ->
544        ok
545    end.
546
547setup_test() ->
548    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
549    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
550    populate_set().
551
552is_same_partition(PartId, Docs) ->
553    lists:all(fun({_, #dcp_doc{partition = P}}) ->
554        P =:= PartId
555    end, Docs).
556
557read_streams(Pid, StreamReqs) ->
558    lists:foldr(fun(Request, Acc) ->
559        case couch_dcp_client:get_stream_event(Pid, Request) of
560        {error, vbucket_stream_not_found} ->
561            [drained | Acc];
562        {Optype, _} = Item ->
563            case Optype of
564            stream_end ->
565                [[] | Acc];
566            snapshot_marker ->
567                [[] | Acc];
568            _ ->
569                [[Item] | Acc]
570            end;
571        _ ->
572            [[] | Acc]
573        end
574    end, [], StreamReqs).
575
576read_mutations(Pid, StreamReqs, Acc) ->
577    Items = read_streams(Pid, StreamReqs),
578    case lists:all(fun(Item) -> Item =:= drained end, Items) of
579    true ->
580        Acc;
581    false ->
582        Acc2 = lists:zipwith(fun(Item, Acc0) ->
583            case Item of
584            drained ->
585                Acc0;
586            _ ->
587                Item ++ Acc0
588            end
589        end, Items, Acc),
590        read_mutations(Pid, StreamReqs, Acc2)
591    end.
592
593doc_id(I) ->
594    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
595
596create_docs(From, To) ->
597    lists:map(
598        fun(I) ->
599            Cas = I,
600            ExpireTime = 0,
601            Flags = 0,
602            RevMeta1 = <<Cas:64/native, ExpireTime:32/native, Flags:32/native>>,
603            RevMeta2 = [[io_lib:format("~2.16.0b",[X]) || <<X:8>> <= RevMeta1 ]],
604            RevMeta3 = iolist_to_binary(RevMeta2),
605            {[
606              {<<"meta">>, {[
607                             {<<"id">>, doc_id(I)},
608                             {<<"rev">>, <<"1-", RevMeta3/binary>>}
609                            ]}},
610              {<<"json">>, {[{<<"value">>, I}]}}
611            ]}
612        end,
613        lists:seq(From, To)).
614
615populate_set() ->
616    etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
617        " databases with " ++ integer_to_list(num_docs()) ++ " documents"),
618    DocList = create_docs(1, num_docs()),
619    ok = couch_set_view_test_util:populate_set_sequentially(
620        test_set_name(),
621        lists:seq(0, num_set_partitions() - 1),
622        DocList).
623
624delete_docs(StartId, NumDocs) ->
625    Dbs = dict:from_list(lists:map(
626        fun(I) ->
627            {ok, Db} = couch_set_view_test_util:open_set_db(
628                test_set_name(), I),
629            {I, Db}
630        end,
631        lists:seq(0, num_set_partitions() - 1))),
632    Docs = lists:foldl(
633        fun(I, Acc) ->
634            Doc = couch_doc:from_json_obj({[
635                {<<"meta">>, {[{<<"deleted">>, true}, {<<"id">>, doc_id(I)}]}},
636                {<<"json">>, {[]}}
637            ]}),
638            DocsPerPartition = num_docs() div num_set_partitions(),
639            DocList = case orddict:find(I div DocsPerPartition, Acc) of
640            {ok, L} ->
641                L;
642            error ->
643                []
644            end,
645            orddict:store(I div DocsPerPartition, [Doc | DocList], Acc)
646        end,
647        orddict:new(), lists:seq(StartId, StartId + NumDocs - 1)),
648    [] = orddict:fold(
649        fun(I, DocList, Acc) ->
650            Db = dict:fetch(I, Dbs),
651            etap:diag("Deleting " ++ integer_to_list(length(DocList)) ++
652                " documents from partition " ++ integer_to_list(I)),
653            ok = couch_db:update_docs(Db, DocList, [sort_docs]),
654            Acc
655        end,
656        [], Docs),
657    ok = lists:foreach(fun({_, Db}) ->
658        ok = couch_db:close(Db)
659    end, dict:to_list(Dbs)).
660
661
662first_uuid(FailoverLog) ->
663    element(1, hd(FailoverLog)).
664
665receive_mutation(0, _, _) ->
666    ok;
667receive_mutation(Count, Pid, Stream) ->
668    couch_dcp_fake_server:send_single_mutation(),
669    couch_dcp_client:get_stream_event(Pid, Stream),
670    receive_mutation(Count - 1, Pid, Stream).
671
672get_event_size({Type, Doc}) ->
673    case Type of
674        snapshot_mutation ->
675            #dcp_doc {
676                id = Key,
677                body = Value
678            } = Doc,
679            ?DCP_MSG_SIZE_MUTATION + erlang:external_size(Key) + erlang:external_size(Value);
680        stream_end ->
681            ?DCP_MSG_SIZE_STREAM_END;
682        snapshot_marker ->
683            ?DCP_MSG_SIZE_SNAPSHOT;
684        snapshot_deletion ->
685            #dcp_doc {
686                id = Key
687            } = Doc,
688           ?DCP_MSG_SIZE_DELETION + erlang:external_size(Key)
689    end.
690