1#!/usr/bin/env escript
2%% -*- erlang -*-
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_dcp/include/couch_dcp.hrl").
18
19test_set_name() -> <<"couch_test_couch_dcp_client">>.
20num_set_partitions() -> 4.
21num_docs() -> 1000.
22num_docs_pp() -> num_docs() div num_set_partitions().
23
24-define(DCP_MSG_SIZE_MUTATION, 55).
25-define(DCP_MSG_SIZE_DELETION, 42).
26-define(DCP_MSG_SIZE_SNAPSHOT , 44).
27-define(DCP_MSG_SIZE_STREAM_END, 28).
28
29main(_) ->
30    test_util:init_code_path(),
31
32    etap:plan(56),
33    case (catch test()) of
34        ok ->
35            etap:end_tests();
36        Other ->
37            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
38            etap:bail(Other)
39    end,
40    %init:stop(),
41    %receive after infinity -> ok end,
42    ok.
43
44
45test() ->
46    couch_set_view_test_util:start_server(test_set_name()),
47    setup_test(),
48
49    tests(),
50    test_close_during_request(),
51
52    couch_set_view_test_util:stop_server(),
53    ok.
54
55tests() ->
56    % Populate failover log
57    FailoverLogs = lists:map(fun(PartId) ->
58        FailoverLog = [
59            {10001, PartId + 3}, {10002, PartId + 2}, {10003, 0}],
60        couch_dcp_fake_server:set_failover_log(PartId, FailoverLog),
61        FailoverLog
62    end, lists:seq(0, num_set_partitions() - 1)),
63
64    TestFun = fun(Item, Acc) ->
65        case Item of
66        {snapshot_marker, _} ->
67            Acc;
68        {part_versions, _} ->
69            Acc;
70        _ ->
71            Acc ++ [Item]
72        end
73    end,
74
75    AddStreamFun = fun(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
76        couch_dcp_client:add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags)
77    end,
78
79    {auth, User, Passwd} = cb_auth_info:get(),
80    {ok, Pid} = couch_dcp_client:start(
81        test_set_name(), test_set_name(), User, Passwd, 1024, 0),
82
83    % Get the latest partition version first
84    {ok, InitialFailoverLog0} = couch_dcp_client:get_failover_log(Pid, 0),
85    etap:is(InitialFailoverLog0, hd(FailoverLogs), "Failover log is correct"),
86
87    % First parameter is the partition, the second is the sequence number
88    % to start at.
89    {ok, Docs1, FailoverLog1} = couch_dcp_client:enum_docs_since(
90        Pid, 0, InitialFailoverLog0, 4, 10, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
91    etap:is(length(Docs1), 6, "Correct number of docs (6) in partition 0"),
92    etap:is(FailoverLog1, lists:nth(1, FailoverLogs),
93        "Failoverlog from partition 0 is correct"),
94
95    {ok, InitialFailoverLog1} = couch_dcp_client:get_failover_log(Pid, 1),
96    {ok, Docs2, FailoverLog2} = couch_dcp_client:enum_docs_since(
97        Pid, 1, InitialFailoverLog1, 46, 165, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
98    etap:is(length(Docs2), 119, "Correct number of docs (109) partition 1"),
99    etap:is(FailoverLog2, lists:nth(2, FailoverLogs),
100        "Failoverlog from partition 1 is correct"),
101
102    {ok, InitialFailoverLog2} = couch_dcp_client:get_failover_log(Pid, 2),
103    {ok, Docs3, FailoverLog3} = couch_dcp_client:enum_docs_since(
104        Pid, 2, InitialFailoverLog2, 80, num_docs() div num_set_partitions(),
105        ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
106    Expected3 = (num_docs() div num_set_partitions()) - 80,
107    etap:is(length(Docs3), Expected3,
108        io_lib:format("Correct number of docs (~p) partition 2", [Expected3])),
109    etap:is(FailoverLog3, lists:nth(3, FailoverLogs),
110        "Failoverlog from partition 2 is correct"),
111
112    {ok, InitialFailoverLog3} = couch_dcp_client:get_failover_log(Pid, 3),
113    {ok, Docs4, FailoverLog4} = couch_dcp_client:enum_docs_since(
114        Pid, 3, InitialFailoverLog3, 0, 5, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
115    etap:is(length(Docs4), 5, "Correct number of docs (5) partition 3"),
116    etap:is(FailoverLog4, lists:nth(4, FailoverLogs),
117        "Failoverlog from partition 3 is correct"),
118
119    % Try a too high sequence number to get a erange error response
120    {error, ErangeError} = couch_dcp_client:enum_docs_since(
121        Pid, 0, InitialFailoverLog0, 400, 450, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
122    etap:is(ErangeError, wrong_start_sequence_number,
123        "Correct error message for too high sequence number"),
124
125    % Start sequence is bigger than end sequence
126    {_Request, ErangeError2} =
127        couch_dcp_client:add_stream(
128            Pid, 0, first_uuid(InitialFailoverLog0), 5, 2,
129            ?DCP_FLAG_NOFLAG),
130    etap:is(ErangeError2, {error, wrong_start_sequence_number},
131        "Correct error message for start sequence > end sequence"),
132
133    Error = couch_dcp_client:enum_docs_since(
134        Pid, 1, [{4455667788, 1243}], 46, 165, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
135    etap:is(Error, {rollback, 0},
136        "Correct error for wrong failover log"),
137
138    {ok, [Seq0]} = couch_dcp_client:get_seqs(Pid, [0]),
139    etap:is(Seq0, {0, num_docs() div num_set_partitions()},
140        "Sequence number of partition 0 is correct"),
141    {ok, [Seq1]} = couch_dcp_client:get_seqs(Pid, [1]),
142    etap:is(Seq1, {1, num_docs() div num_set_partitions()},
143        "Sequence number of partition 1 is correct"),
144    {ok, [Seq2]} = couch_dcp_client:get_seqs(Pid, [2]),
145    etap:is(Seq2, {2, num_docs() div num_set_partitions()},
146         "Sequence number of partition 2 is correct"),
147    {ok, [Seq3]} = couch_dcp_client:get_seqs(Pid, [3]),
148    etap:is(Seq3, {3, num_docs() div num_set_partitions()},
149        "Sequence number of partition 3 is correct"),
150    SeqMissing = couch_dcp_client:get_seqs(Pid, [100000]),
151    etap:is(SeqMissing, {ok, []},
152        "Too high partition number returns correct error"),
153    SeqAll = couch_dcp_client:get_seqs(Pid, nil),
154    etap:is(SeqAll, {ok, [Seq0, Seq1, Seq2, Seq3]},
155        "Returned all partition seqs correctly"),
156
157
158    % Test snapshot markers types
159
160    TestSnapshotFun = fun(Item, Acc) ->
161        case Item of
162        {snapshot_marker, Marker} ->
163            Acc ++ [Marker];
164        {part_versions, _} ->
165            Acc;
166        _ ->
167            Acc
168        end
169    end,
170
171    SnapshotStart1 = 0,
172    SnapshotEnd1 = num_docs() div (num_set_partitions() * 2),
173    {ok, Markers1, SnapshotFailoverLog1} = couch_dcp_client:enum_docs_since(
174        Pid, 2, [{0, 0}], SnapshotStart1, SnapshotEnd1, ?DCP_FLAG_NOFLAG,
175        TestSnapshotFun, [], AddStreamFun),
176    ExpectedMarkers1 = [{SnapshotStart1, SnapshotEnd1,
177        ?DCP_SNAPSHOT_TYPE_DISK}],
178    etap:is(Markers1, ExpectedMarkers1,
179        "Received one on-disk snapshot marker"),
180
181    SnapshotStart2 = num_docs() div (num_set_partitions() * 2),
182    SnapshotEnd2 = num_docs() div num_set_partitions(),
183    {ok, Markers2, SnapshotFailoverLog2} = couch_dcp_client:enum_docs_since(
184        Pid, 2, SnapshotFailoverLog1, SnapshotStart2, SnapshotEnd2,
185        ?DCP_FLAG_NOFLAG, TestSnapshotFun, [], AddStreamFun),
186    ExpectedMarkers2 = [{SnapshotStart2, SnapshotEnd2,
187        ?DCP_SNAPSHOT_TYPE_MEMORY}],
188    etap:is(Markers2, ExpectedMarkers2,
189        "Received one in-memory snapshot marker"),
190
191
192    % Test multiple snapshots
193
194    TestAllFun = fun(Item, Acc) -> Acc ++ [Item] end,
195
196    ItemsPerSnapshot = 30,
197    couch_dcp_fake_server:set_items_per_snapshot(ItemsPerSnapshot),
198    SnapshotStart3 = 0,
199    SnapshotEnd3 = num_docs() div num_set_partitions(),
200    {ok, All3, SnapshotFailoverLog3} = couch_dcp_client:enum_docs_since(
201        Pid, 2, [{0, 0}], SnapshotStart3, SnapshotEnd3, ?DCP_FLAG_NOFLAG,
202        TestAllFun, [], AddStreamFun),
203    Markers3 = [M || {snapshot_marker, M} <- All3],
204    ExpectedMarkers3 = [{0, ItemsPerSnapshot, ?DCP_SNAPSHOT_TYPE_DISK}] ++
205        lists:map(fun(I) ->
206            {I, min(I + ItemsPerSnapshot, SnapshotEnd3),
207                ?DCP_SNAPSHOT_TYPE_MEMORY}
208        end, lists:seq(ItemsPerSnapshot, SnapshotEnd3 - 1, ItemsPerSnapshot)),
209    etap:is(Markers3, ExpectedMarkers3,
210        "Received the expected snapshot markers"),
211
212    Mutations3 = [M || #dcp_doc{} = M <- All3],
213    couch_dcp_fake_server:set_items_per_snapshot(0),
214    {ok, ExpectedMutations3, SnapshotFailoverLog3} =
215            couch_dcp_client:enum_docs_since(
216        Pid, 2, [{0, 0}], SnapshotStart3, SnapshotEnd3, ?DCP_FLAG_NOFLAG,
217        TestFun, [], AddStreamFun),
218    etap:is(Mutations3, ExpectedMutations3,
219        "Received the expected mutations within the several snapshots"),
220
221
222    % Test duplicated items in multiple snapshots
223
224    ItemsPerSnapshot2 = 30,
225    DupsPerSnapshot2 = 4,
226    couch_dcp_fake_server:set_items_per_snapshot(ItemsPerSnapshot2),
227    couch_dcp_fake_server:set_dups_per_snapshot(DupsPerSnapshot2),
228    DupsStart2 = 0,
229    DupsEnd2 = couch_dcp_fake_server:num_items_with_dups(
230        num_docs_pp(), ItemsPerSnapshot2, DupsPerSnapshot2),
231
232    {ok, AllDups2, DupsFailoverLog2} = couch_dcp_client:enum_docs_since(
233        Pid, 2, [{0, 0}], DupsStart2, DupsEnd2, ?DCP_FLAG_NOFLAG,
234        TestAllFun, [], AddStreamFun),
235    DupsMutations2 = [M || #dcp_doc{} = M <- AllDups2],
236    DupsMarkers2 = [M || {snapshot_marker, M} <- AllDups2],
237    etap:is(length(DupsMutations2), DupsEnd2,
238        "received the expected number of mutations (incl. duplicates)"),
239    etap:is(length(DupsMarkers2),
240        couch_dcp_fake_server:ceil_div(DupsEnd2, ItemsPerSnapshot2),
241        "received the expected number of snapshots"),
242    DupsUnique2 = lists:ukeysort(#dcp_doc.id, DupsMutations2),
243    DupsUniqueIds2 = [Id || #dcp_doc{id = Id} <- DupsUnique2],
244    MutationsIds2 = [Id || #dcp_doc{id = Id} <- Mutations3],
245    etap:is(DupsUniqueIds2, MutationsIds2,
246        "received the expected mutations when de-duplicated"),
247    couch_dcp_fake_server:set_items_per_snapshot(0),
248    couch_dcp_fake_server:set_dups_per_snapshot(0),
249
250
251    % Test multiple streams in parallel
252    {StreamReq0, {failoverlog, InitialFailoverLog0}} =
253        couch_dcp_client:add_stream(
254            Pid, 0, first_uuid(InitialFailoverLog0), 10, 100,
255            ?DCP_FLAG_NOFLAG),
256
257    {StreamReq1, {failoverlog, InitialFailoverLog1}} =
258        couch_dcp_client:add_stream(
259            Pid, 1, first_uuid(InitialFailoverLog1), 100, 200,
260            ?DCP_FLAG_NOFLAG),
261
262    {StreamReq2, {failoverlog, InitialFailoverLog2}} =
263        couch_dcp_client:add_stream(
264            Pid, 2, first_uuid(InitialFailoverLog2), 0, 10, ?DCP_FLAG_NOFLAG),
265
266    [MutationsPart0, MutationsPart1, MutationsPart2] = read_mutations(
267                    Pid, [StreamReq0, StreamReq1, StreamReq2], [[], [], []]),
268
269
270    etap:is(is_same_partition(0, MutationsPart0), true,
271        "Stream0 has only partition0 mutations"),
272    etap:is(is_same_partition(1, MutationsPart1), true,
273        "Stream1 has only partition1 mutations"),
274    etap:is(is_same_partition(2, MutationsPart2), true,
275        "Stream2 has only partition2 mutations"),
276
277    etap:is(length(MutationsPart0), 90,
278        "Stream0 has 90 mutations"),
279    etap:is(length(MutationsPart1), 100,
280        "Stream1 has 100 mutations"),
281    etap:is(length(MutationsPart2), 10,
282        "Stream2 has 10 mutations"),
283
284    {active_list_streams, StreamList0} = couch_dcp_client:list_streams(Pid),
285    etap:is(StreamList0, [], "Stream list is empty"),
286
287    couch_dcp_fake_server:pause_mutations(),
288    {StreamReq0_2, {failoverlog, InitialFailoverLog0}} =
289        couch_dcp_client:add_stream(
290            Pid, 0, first_uuid(InitialFailoverLog0), 1, 100, ?DCP_FLAG_NOFLAG),
291
292    {_, StreamResp0_3} =
293        couch_dcp_client:add_stream(
294            Pid, 0, first_uuid(InitialFailoverLog0), 10, 100,
295            ?DCP_FLAG_NOFLAG),
296    etap:is(StreamResp0_3, {error,vbucket_stream_already_exists},
297        "Stream for vbucket 0 already exists"),
298    couch_dcp_fake_server:continue_mutations(),
299
300    % Drain all mutations
301    read_mutations(Pid, [StreamReq0_2], [[]]),
302
303    couch_dcp_fake_server:pause_mutations(),
304    couch_dcp_client:add_stream(
305        Pid, 1, first_uuid(InitialFailoverLog1), 10, 300, ?DCP_FLAG_NOFLAG),
306
307    couch_dcp_client:add_stream(
308        Pid, 2, first_uuid(InitialFailoverLog2), 100, 200, ?DCP_FLAG_NOFLAG),
309
310    {active_list_streams, StreamList1} = couch_dcp_client:list_streams(Pid),
311    etap:is(StreamList1, [1,2], "Stream list contains parititon 1,2"),
312
313    StreamRemoveResp0 = couch_dcp_client:remove_stream(Pid, 1),
314    {active_list_streams, StreamList2} = couch_dcp_client:list_streams(Pid),
315    etap:is({StreamRemoveResp0, StreamList2}, {ok, [2]},
316        "Removed parititon stream 1 and parition stream 2 is left"),
317
318    StreamRemoveResp1 = couch_dcp_client:remove_stream(Pid, 1),
319    etap:is(StreamRemoveResp1, {error, vbucket_stream_not_found},
320        "Correct error on trying to remove non-existing stream"),
321    couch_dcp_fake_server:continue_mutations(),
322
323    % Test with too large failover log
324    TooLargeFailoverLog = [{I, I} ||
325        I <- lists:seq(0, ?DCP_MAX_FAILOVER_LOG_SIZE)],
326    PartId = 1,
327    couch_dcp_fake_server:set_failover_log(PartId, TooLargeFailoverLog),
328    TooLargeError = couch_dcp_client:enum_docs_since(
329          Pid, PartId, [{0, 0}], 0, 100, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
330    etap:is(TooLargeError, {error, too_large_failover_log},
331        "Too large failover log returns correct error"),
332
333    gen_server:call(Pid, reset_buffer_size),
334    % Remove existing streams
335    couch_dcp_client:remove_stream(Pid, 0),
336    couch_dcp_client:remove_stream(Pid, 1),
337    couch_dcp_client:remove_stream(Pid, 2),
338
339
340    % Tests for flow control
341    couch_dcp_fake_server:pause_mutations(),
342
343    {StreamReq0_4, _} = couch_dcp_client:add_stream(
344        Pid, 0, first_uuid(InitialFailoverLog0), 0, 500, ?DCP_FLAG_NOFLAG),
345
346    gen_server:call(Pid, reset_buffer_size),
347    Ret = couch_dcp_fake_server:is_control_req(),
348
349    etap:is(Ret, true, "Buffer control request sent"),
350
351    NumBufferAck = couch_dcp_fake_server:get_num_buffer_acks(),
352
353    % Fill the client buffer by asking server to send 1024 bytes data
354    try_until_throttled(Pid, StreamReq0_4, 1000, 1024),
355
356    % Consume 200 bytes from the client
357    try_until_unthrottled(Pid, StreamReq0_4, 0, 200),
358
359    NumBufferAck2 = couch_dcp_fake_server:get_num_buffer_acks(),
360    etap:is(NumBufferAck2, NumBufferAck, "Not sent the buffer ack"),
361
362    % Consume More data so that is greater then 20 % of 1024 i.e.204.
363    % when data is 20% consumed, client sends the buffer ack to increase
364    % the flow control buffer.
365    try_until_unthrottled(Pid, StreamReq0_4, 0, 200),
366    timer:sleep(2),
367    NumBufferAck3 = couch_dcp_fake_server:get_num_buffer_acks(),
368    etap:is(NumBufferAck3, NumBufferAck + 1, "Got the buffer ack"),
369    couch_dcp_client:remove_stream(Pid, 0),
370
371    couch_dcp_fake_server:pause_mutations(),
372    {StreamReq1_2, _} = couch_dcp_client:add_stream(
373        Pid, 1, first_uuid(InitialFailoverLog1), 0, 500, ?DCP_FLAG_NOFLAG),
374
375    ReqPid = spawn(fun() ->
376        couch_dcp_client:get_stream_event(Pid, StreamReq1_2)
377        end),
378
379    EvResponse = couch_dcp_client:get_stream_event(Pid, StreamReq1_2),
380    etap:is(EvResponse, {error, event_request_already_exists},
381        "Error message received when requested multiple times for same stream's event"),
382
383    exit(ReqPid, shutdown),
384    couch_dcp_client:remove_stream(Pid, 1),
385    gen_server:call(Pid, flush_old_streams_meta),
386
387    {StreamReq0_5, _} = couch_dcp_client:add_stream(Pid, 2,
388        first_uuid(InitialFailoverLog2), 0, 500, ?DCP_FLAG_NOFLAG),
389
390    % Connection close and reconnection tests
391    ok = couch_dcp_fake_server:send_single_mutation(),
392    {snapshot_mutation, Mutation1} = couch_dcp_client:get_stream_event(Pid, StreamReq0_5),
393    #dcp_doc{seq = SeqNo1} = Mutation1,
394    OldSocket = gen_server:call(Pid, get_socket),
395    ok = couch_dcp_fake_server:close_connection(2),
396    % wait for sometime to do reconnect from couch_dcp client
397    timer:sleep(4000),
398    NewSocket = gen_server:call(Pid, get_socket),
399    etap:isnt(OldSocket, NewSocket, "Socket changed"),
400    ok = couch_dcp_fake_server:send_single_mutation(),
401    ErrorResp = couch_dcp_client:get_stream_event(Pid, StreamReq0_5),
402    etap:is({error, dcp_conn_closed}, ErrorResp, "Got the error response after connection close"),
403    {active_list_streams, EmptyStreamList} = couch_dcp_client:list_streams(Pid),
404    etap:is([], EmptyStreamList, "Stream is correctly removed after connection close"),
405
406    ok = couch_dcp_fake_server:close_connection(nil),
407    % wait for sometime to do reconnect from couch_dcp client
408    timer:sleep(4000),
409    couch_dcp_client:remove_stream(Pid, 2),
410    couch_dcp_fake_server:continue_mutations(),
411
412    % Test get_num_items
413
414    {ok, NumItems0} = couch_dcp_client:get_num_items(Pid, 0),
415    etap:is(NumItems0, num_docs() div num_set_partitions(),
416        "Number of items of partition 0 is correct"),
417    {ok, NumItems1} = couch_dcp_client:get_num_items(Pid, 1),
418    etap:is(NumItems1, num_docs() div num_set_partitions(),
419        "Number of items of partition 1 is correct"),
420    {ok, NumItems2} = couch_dcp_client:get_num_items(Pid, 2),
421    etap:is(NumItems2, num_docs() div num_set_partitions(),
422         "Number of items of partition 2 is correct"),
423    {ok, NumItems3} = couch_dcp_client:get_num_items(Pid, 3),
424    etap:is(NumItems3, num_docs() div num_set_partitions(),
425        "Number of items of partition 3 is correct"),
426    NumItemsError = couch_dcp_client:get_num_items(Pid, 100000),
427    etap:is(NumItemsError, {error, not_my_vbucket},
428        "Too high partition number returns correct error"),
429
430    DocsPerPartition = num_docs() div num_set_partitions(),
431    NumDelDocs0 = num_docs() div (num_set_partitions() * 2),
432    delete_docs(1, NumDelDocs0),
433    {ok, NumItemsDel0} = couch_dcp_client:get_num_items(Pid, 0),
434    etap:is(NumItemsDel0, DocsPerPartition - NumItemsDel0,
435        "Number of items of partition 0 after some deletions is correct"),
436    NumDelDocs3 = num_docs() div (num_set_partitions() * 4),
437    delete_docs(3 * DocsPerPartition + 1, NumDelDocs3),
438    {ok, NumItemsDel3} = couch_dcp_client:get_num_items(Pid, 3),
439    etap:is(NumItemsDel3, DocsPerPartition - NumDelDocs3,
440        "Number of items of partition 3 after some deletions is correct"),
441
442    % Tests for requesting persisted items only
443
444    couch_dcp_fake_server:set_persisted_items_fun(fun(Seq) -> Seq  end),
445    {ok, [{1, HighSeq1}]} = couch_dcp_client:get_seqs(Pid, [1]),
446    {ok, ExpectedDocs1, _} =
447        couch_dcp_client:enum_docs_since(
448            Pid, 0, InitialFailoverLog0, 0, HighSeq1, ?DCP_FLAG_NOFLAG,
449            TestFun, [], AddStreamFun),
450    {ok, PersistedDocs1, _} =
451        couch_dcp_client:enum_docs_since(
452            Pid, 0, InitialFailoverLog0, 0, HighSeq1, ?DCP_FLAG_DISKONLY,
453            TestFun, [], AddStreamFun),
454    etap:is(PersistedDocs1, ExpectedDocs1,
455        "The persisted sequence number is correct, seq"),
456
457    couch_dcp_fake_server:set_persisted_items_fun(
458       fun(Seq) -> Seq div 2 end),
459    {ok, [{1, HighSeq2}]} = couch_dcp_client:get_seqs(Pid, [1]),
460    {ok, ExpectedDocs2, _} =
461        couch_dcp_client:enum_docs_since(
462            Pid, 0, InitialFailoverLog0, 0, HighSeq2 div 2, ?DCP_FLAG_NOFLAG,
463            TestFun, [], AddStreamFun),
464    {ok, PersistedDocs2, _} =
465        couch_dcp_client:enum_docs_since(
466            Pid, 0, InitialFailoverLog0, 0, HighSeq2, ?DCP_FLAG_DISKONLY,
467            TestFun, [], AddStreamFun),
468    etap:is(PersistedDocs2, ExpectedDocs2,
469        "The persisted sequence number is correct, seq/2"),
470
471    couch_dcp_fake_server:set_persisted_items_fun(fun(Seq) -> Seq - 1 end),
472    {ok, [{1, HighSeq3}]} = couch_dcp_client:get_seqs(Pid, [1]),
473    {ok, ExpectedDocs3, _} =
474        couch_dcp_client:enum_docs_since(
475            Pid, 0, InitialFailoverLog0, 0, HighSeq3 - 1, ?DCP_FLAG_NOFLAG,
476            TestFun, [], AddStreamFun),
477    {ok, PersistedDocs3, _} =
478        couch_dcp_client:enum_docs_since(
479            Pid, 0, InitialFailoverLog0, 0, HighSeq3, ?DCP_FLAG_DISKONLY,
480            TestFun, [], AddStreamFun),
481    etap:is(PersistedDocs3, ExpectedDocs3,
482        "The persisted sequence number is correct, seq - 1"),
483    ok.
484
485% Test robustness when the connection is dropped during a get all sequence
486% numbers or stats request
487% This is a regression test for MB-15922 and MB-17026
488test_close_during_request() ->
489    %timer:sleep(1),
490    %couch_set_view_test_util:start_server(test_set_name()),
491    %setup_test(),
492
493    {auth, User, Passwd} = cb_auth_info:get(),
494    {ok, Pid} = couch_dcp_client:start(
495        test_set_name(), test_set_name(), User, Passwd, 1024, 0),
496
497    ParentPid = self(),
498
499    % Get sequencenumbers request
500    spawn(fun() ->
501        couch_dcp_fake_server:close_on_next(),
502        Closed = couch_dcp_client:get_seqs(Pid, [0]),
503        ParentPid ! {ok, Closed}
504    end),
505    receive
506    {ok, Closed} ->
507        etap:is(Closed, {error, dcp_conn_closed},
508            "The connection got (as expected) closed "
509            "during the get sequence numbers request")
510    after 10000 ->
511         etap:bail("Cannot get sequence number on time, the DCP client hangs")
512    end,
513
514    % Stats request
515    spawn(fun() ->
516        couch_dcp_fake_server:close_on_next(),
517        catch couch_dcp_client:get_num_items(Pid, 0),
518        ParentPid ! ok
519    end),
520    receive
521    ok ->
522        etap:ok(true,
523            "The connection got (as expected) closed during the stats request")
524    after 10000 ->
525         etap:bail("Cannot get stats on time, the DCP client hangs")
526    end,
527    ok.
528
529
530try_until_throttled(Pid, ReqId, N, MaxSize) when N > 0 ->
531    ok = couch_dcp_fake_server:send_single_mutation(),
532    timer:sleep(1),
533    Size2 = gen_server:call(Pid, {get_buffer_size, ReqId}),
534    if 
535    MaxSize > Size2 ->
536        try_until_throttled(Pid, ReqId, N - 1, MaxSize);
537    true ->
538        ok
539    end.
540
541try_until_unthrottled(Pid, ReqId, Size, MaxSize) ->
542    Data = couch_dcp_client:get_stream_event(Pid, ReqId),
543    Size2 = Size + get_event_size(Data), 
544    if
545    Size2 < MaxSize ->
546        try_until_unthrottled(Pid, ReqId, Size2, MaxSize);
547    true ->
548        ok
549    end.
550
551setup_test() ->
552    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
553    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
554    populate_set().
555
556is_same_partition(PartId, Docs) ->
557    lists:all(fun({_, #dcp_doc{partition = P}}) ->
558        P =:= PartId
559    end, Docs).
560
561read_streams(Pid, StreamReqs) ->
562    lists:foldr(fun(Request, Acc) ->
563        case couch_dcp_client:get_stream_event(Pid, Request) of
564        {error, vbucket_stream_not_found} ->
565            [drained | Acc];
566        {Optype, _} = Item ->
567            case Optype of
568            stream_end ->
569                [[] | Acc];
570            snapshot_marker ->
571                [[] | Acc];
572            _ ->
573                [[Item] | Acc]
574            end;
575        _ ->
576            [[] | Acc]
577        end
578    end, [], StreamReqs).
579
580read_mutations(Pid, StreamReqs, Acc) ->
581    Items = read_streams(Pid, StreamReqs),
582    case lists:all(fun(Item) -> Item =:= drained end, Items) of
583    true ->
584        Acc;
585    false ->
586        Acc2 = lists:zipwith(fun(Item, Acc0) ->
587            case Item of
588            drained ->
589                Acc0;
590            _ ->
591                Item ++ Acc0
592            end
593        end, Items, Acc),
594        read_mutations(Pid, StreamReqs, Acc2)
595    end.
596
597doc_id(I) ->
598    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
599
600create_docs(From, To) ->
601    lists:map(
602        fun(I) ->
603            Cas = I,
604            ExpireTime = 0,
605            Flags = 0,
606            RevMeta1 = <<Cas:64/native, ExpireTime:32/native, Flags:32/native>>,
607            RevMeta2 = [[io_lib:format("~2.16.0b",[X]) || <<X:8>> <= RevMeta1 ]],
608            RevMeta3 = iolist_to_binary(RevMeta2),
609            {[
610              {<<"meta">>, {[
611                             {<<"id">>, doc_id(I)},
612                             {<<"rev">>, <<"1-", RevMeta3/binary>>}
613                            ]}},
614              {<<"json">>, {[{<<"value">>, I}]}}
615            ]}
616        end,
617        lists:seq(From, To)).
618
619populate_set() ->
620    etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
621        " databases with " ++ integer_to_list(num_docs()) ++ " documents"),
622    DocList = create_docs(1, num_docs()),
623    ok = couch_set_view_test_util:populate_set_sequentially(
624        test_set_name(),
625        lists:seq(0, num_set_partitions() - 1),
626        DocList).
627
628delete_docs(StartId, NumDocs) ->
629    Dbs = dict:from_list(lists:map(
630        fun(I) ->
631            {ok, Db} = couch_set_view_test_util:open_set_db(
632                test_set_name(), I),
633            {I, Db}
634        end,
635        lists:seq(0, num_set_partitions() - 1))),
636    Docs = lists:foldl(
637        fun(I, Acc) ->
638            Doc = couch_doc:from_json_obj({[
639                {<<"meta">>, {[{<<"deleted">>, true}, {<<"id">>, doc_id(I)}]}},
640                {<<"json">>, {[]}}
641            ]}),
642            DocsPerPartition = num_docs() div num_set_partitions(),
643            DocList = case orddict:find(I div DocsPerPartition, Acc) of
644            {ok, L} ->
645                L;
646            error ->
647                []
648            end,
649            orddict:store(I div DocsPerPartition, [Doc | DocList], Acc)
650        end,
651        orddict:new(), lists:seq(StartId, StartId + NumDocs - 1)),
652    [] = orddict:fold(
653        fun(I, DocList, Acc) ->
654            Db = dict:fetch(I, Dbs),
655            etap:diag("Deleting " ++ integer_to_list(length(DocList)) ++
656                " documents from partition " ++ integer_to_list(I)),
657            ok = couch_db:update_docs(Db, DocList, [sort_docs]),
658            Acc
659        end,
660        [], Docs),
661    ok = lists:foreach(fun({_, Db}) ->
662        ok = couch_db:close(Db)
663    end, dict:to_list(Dbs)).
664
665
666first_uuid(FailoverLog) ->
667    element(1, hd(FailoverLog)).
668
669receive_mutation(0, _, _) ->
670    ok;
671receive_mutation(Count, Pid, Stream) ->
672    couch_dcp_fake_server:send_single_mutation(),
673    couch_dcp_client:get_stream_event(Pid, Stream),
674    receive_mutation(Count - 1, Pid, Stream).
675
676get_event_size({Type, Doc}) ->
677    case Type of
678        snapshot_mutation ->
679            #dcp_doc {
680                id = Key,
681                body = Value
682            } = Doc,
683            ?DCP_MSG_SIZE_MUTATION + erlang:external_size(Key) + erlang:external_size(Value);
684        stream_end ->
685            ?DCP_MSG_SIZE_STREAM_END;
686        snapshot_marker ->
687            ?DCP_MSG_SIZE_SNAPSHOT;
688        snapshot_deletion ->
689            #dcp_doc {
690                id = Key
691            } = Doc,
692           ?DCP_MSG_SIZE_DELETION + erlang:external_size(Key)
693    end.
694