1#!/usr/bin/env escript
2%% -*- erlang -*-
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_dcp/include/couch_dcp.hrl").
18
19test_set_name() -> <<"couch_test_couch_dcp_client">>.
20num_set_partitions() -> 4.
21num_docs() -> 1000.
22num_docs_pp() -> num_docs() div num_set_partitions().
23
24-define(DCP_MSG_SIZE_MUTATION, 55).
25-define(DCP_MSG_SIZE_DELETION, 42).
26-define(DCP_MSG_SIZE_SNAPSHOT , 44).
27-define(DCP_MSG_SIZE_STREAM_END, 28).
28
29main(_) ->
30    test_util:init_code_path(),
31
32    etap:plan(56),
33    case (catch test()) of
34        ok ->
35            etap:end_tests();
36        Other ->
37            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
38            etap:bail(Other)
39    end,
40    ok.
41
42
43test() ->
44    couch_set_view_test_util:start_server(test_set_name()),
45    setup_test(),
46
47    tests(),
48    test_close_during_request(),
49
50    couch_set_view_test_util:stop_server(),
51    ok.
52
53tests() ->
54    % Populate failover log
55    FailoverLogs = lists:map(fun(PartId) ->
56        FailoverLog = [
57            {10001, PartId + 3}, {10002, PartId + 2}, {10003, 0}],
58        couch_dcp_fake_server:set_failover_log(PartId, FailoverLog),
59        FailoverLog
60    end, lists:seq(0, num_set_partitions() - 1)),
61
62    TestFun = fun(Item, Acc) ->
63        case Item of
64        {snapshot_marker, _} ->
65            Acc;
66        {part_versions, _} ->
67            Acc;
68        _ ->
69            Acc ++ [Item]
70        end
71    end,
72
73    AddStreamFun = fun(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags) ->
74        couch_dcp_client:add_stream(Pid, PartId, PartUuid, StartSeq, EndSeq, Flags)
75    end,
76
77    {auth, User, Passwd} = cb_auth_info:get(),
78    {ok, Pid} = couch_dcp_client:start(
79        test_set_name(), test_set_name(), User, Passwd, 1024, 0),
80
81    % Get the latest partition version first
82    {ok, InitialFailoverLog0} = couch_dcp_client:get_failover_log(Pid, 0),
83    etap:is(InitialFailoverLog0, hd(FailoverLogs), "Failover log is correct"),
84
85    % First parameter is the partition, the second is the sequence number
86    % to start at.
87    {ok, Docs1, FailoverLog1} = couch_dcp_client:enum_docs_since(
88        Pid, 0, InitialFailoverLog0, 4, 10, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
89    etap:is(length(Docs1), 6, "Correct number of docs (6) in partition 0"),
90    etap:is(FailoverLog1, lists:nth(1, FailoverLogs),
91        "Failoverlog from partition 0 is correct"),
92
93    {ok, InitialFailoverLog1} = couch_dcp_client:get_failover_log(Pid, 1),
94    {ok, Docs2, FailoverLog2} = couch_dcp_client:enum_docs_since(
95        Pid, 1, InitialFailoverLog1, 46, 165, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
96    etap:is(length(Docs2), 119, "Correct number of docs (109) partition 1"),
97    etap:is(FailoverLog2, lists:nth(2, FailoverLogs),
98        "Failoverlog from partition 1 is correct"),
99
100    {ok, InitialFailoverLog2} = couch_dcp_client:get_failover_log(Pid, 2),
101    {ok, Docs3, FailoverLog3} = couch_dcp_client:enum_docs_since(
102        Pid, 2, InitialFailoverLog2, 80, num_docs() div num_set_partitions(),
103        ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
104    Expected3 = (num_docs() div num_set_partitions()) - 80,
105    etap:is(length(Docs3), Expected3,
106        io_lib:format("Correct number of docs (~p) partition 2", [Expected3])),
107    etap:is(FailoverLog3, lists:nth(3, FailoverLogs),
108        "Failoverlog from partition 2 is correct"),
109
110    {ok, InitialFailoverLog3} = couch_dcp_client:get_failover_log(Pid, 3),
111    {ok, Docs4, FailoverLog4} = couch_dcp_client:enum_docs_since(
112        Pid, 3, InitialFailoverLog3, 0, 5, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
113    etap:is(length(Docs4), 5, "Correct number of docs (5) partition 3"),
114    etap:is(FailoverLog4, lists:nth(4, FailoverLogs),
115        "Failoverlog from partition 3 is correct"),
116
117    % Try a too high sequence number to get a erange error response
118    {error, ErangeError} = couch_dcp_client:enum_docs_since(
119        Pid, 0, InitialFailoverLog0, 400, 450, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
120    etap:is(ErangeError, wrong_start_sequence_number,
121        "Correct error message for too high sequence number"),
122
123    % Start sequence is bigger than end sequence
124    {_Request, ErangeError2} =
125        couch_dcp_client:add_stream(
126            Pid, 0, first_uuid(InitialFailoverLog0), 5, 2,
127            ?DCP_FLAG_NOFLAG),
128    etap:is(ErangeError2, {error, wrong_start_sequence_number},
129        "Correct error message for start sequence > end sequence"),
130
131    Error = couch_dcp_client:enum_docs_since(
132        Pid, 1, [{4455667788, 1243}], 46, 165, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
133    etap:is(Error, {rollback, 0},
134        "Correct error for wrong failover log"),
135
136    {ok, [Seq0]} = couch_dcp_client:get_seqs(Pid, [0]),
137    etap:is(Seq0, {0, num_docs() div num_set_partitions()},
138        "Sequence number of partition 0 is correct"),
139    {ok, [Seq1]} = couch_dcp_client:get_seqs(Pid, [1]),
140    etap:is(Seq1, {1, num_docs() div num_set_partitions()},
141        "Sequence number of partition 1 is correct"),
142    {ok, [Seq2]} = couch_dcp_client:get_seqs(Pid, [2]),
143    etap:is(Seq2, {2, num_docs() div num_set_partitions()},
144         "Sequence number of partition 2 is correct"),
145    {ok, [Seq3]} = couch_dcp_client:get_seqs(Pid, [3]),
146    etap:is(Seq3, {3, num_docs() div num_set_partitions()},
147        "Sequence number of partition 3 is correct"),
148    SeqMissing = couch_dcp_client:get_seqs(Pid, [100000]),
149    etap:is(SeqMissing, {ok, []},
150        "Too high partition number returns correct error"),
151    SeqAll = couch_dcp_client:get_seqs(Pid, nil),
152    etap:is(SeqAll, {ok, [Seq0, Seq1, Seq2, Seq3]},
153        "Returned all partition seqs correctly"),
154
155
156    % Test snapshot markers types
157
158    TestSnapshotFun = fun(Item, Acc) ->
159        case Item of
160        {snapshot_marker, Marker} ->
161            Acc ++ [Marker];
162        {part_versions, _} ->
163            Acc;
164        _ ->
165            Acc
166        end
167    end,
168
169    SnapshotStart1 = 0,
170    SnapshotEnd1 = num_docs() div (num_set_partitions() * 2),
171    {ok, Markers1, SnapshotFailoverLog1} = couch_dcp_client:enum_docs_since(
172        Pid, 2, [{0, 0}], SnapshotStart1, SnapshotEnd1, ?DCP_FLAG_NOFLAG,
173        TestSnapshotFun, [], AddStreamFun),
174    ExpectedMarkers1 = [{SnapshotStart1, SnapshotEnd1,
175        ?DCP_SNAPSHOT_TYPE_DISK}],
176    etap:is(Markers1, ExpectedMarkers1,
177        "Received one on-disk snapshot marker"),
178
179    SnapshotStart2 = num_docs() div (num_set_partitions() * 2),
180    SnapshotEnd2 = num_docs() div num_set_partitions(),
181    {ok, Markers2, SnapshotFailoverLog2} = couch_dcp_client:enum_docs_since(
182        Pid, 2, SnapshotFailoverLog1, SnapshotStart2, SnapshotEnd2,
183        ?DCP_FLAG_NOFLAG, TestSnapshotFun, [], AddStreamFun),
184    ExpectedMarkers2 = [{SnapshotStart2, SnapshotEnd2,
185        ?DCP_SNAPSHOT_TYPE_MEMORY}],
186    etap:is(Markers2, ExpectedMarkers2,
187        "Received one in-memory snapshot marker"),
188
189
190    % Test multiple snapshots
191
192    TestAllFun = fun(Item, Acc) -> Acc ++ [Item] end,
193
194    ItemsPerSnapshot = 30,
195    couch_dcp_fake_server:set_items_per_snapshot(ItemsPerSnapshot),
196    SnapshotStart3 = 0,
197    SnapshotEnd3 = num_docs() div num_set_partitions(),
198    {ok, All3, SnapshotFailoverLog3} = couch_dcp_client:enum_docs_since(
199        Pid, 2, [{0, 0}], SnapshotStart3, SnapshotEnd3, ?DCP_FLAG_NOFLAG,
200        TestAllFun, [], AddStreamFun),
201    Markers3 = [M || {snapshot_marker, M} <- All3],
202    ExpectedMarkers3 = [{0, ItemsPerSnapshot, ?DCP_SNAPSHOT_TYPE_DISK}] ++
203        lists:map(fun(I) ->
204            {I, min(I + ItemsPerSnapshot, SnapshotEnd3),
205                ?DCP_SNAPSHOT_TYPE_MEMORY}
206        end, lists:seq(ItemsPerSnapshot, SnapshotEnd3 - 1, ItemsPerSnapshot)),
207    etap:is(Markers3, ExpectedMarkers3,
208        "Received the expected snapshot markers"),
209
210    Mutations3 = [M || #dcp_doc{} = M <- All3],
211    couch_dcp_fake_server:set_items_per_snapshot(0),
212    {ok, ExpectedMutations3, SnapshotFailoverLog3} =
213            couch_dcp_client:enum_docs_since(
214        Pid, 2, [{0, 0}], SnapshotStart3, SnapshotEnd3, ?DCP_FLAG_NOFLAG,
215        TestFun, [], AddStreamFun),
216    etap:is(Mutations3, ExpectedMutations3,
217        "Received the expected mutations within the several snapshots"),
218
219
220    % Test duplicated items in multiple snapshots
221
222    ItemsPerSnapshot2 = 30,
223    DupsPerSnapshot2 = 4,
224    couch_dcp_fake_server:set_items_per_snapshot(ItemsPerSnapshot2),
225    couch_dcp_fake_server:set_dups_per_snapshot(DupsPerSnapshot2),
226    DupsStart2 = 0,
227    DupsEnd2 = couch_dcp_fake_server:num_items_with_dups(
228        num_docs_pp(), ItemsPerSnapshot2, DupsPerSnapshot2),
229
230    {ok, AllDups2, DupsFailoverLog2} = couch_dcp_client:enum_docs_since(
231        Pid, 2, [{0, 0}], DupsStart2, DupsEnd2, ?DCP_FLAG_NOFLAG,
232        TestAllFun, [], AddStreamFun),
233    DupsMutations2 = [M || #dcp_doc{} = M <- AllDups2],
234    DupsMarkers2 = [M || {snapshot_marker, M} <- AllDups2],
235    etap:is(length(DupsMutations2), DupsEnd2,
236        "received the expected number of mutations (incl. duplicates)"),
237    etap:is(length(DupsMarkers2),
238        couch_dcp_fake_server:ceil_div(DupsEnd2, ItemsPerSnapshot2),
239        "received the expected number of snapshots"),
240    DupsUnique2 = lists:ukeysort(#dcp_doc.id, DupsMutations2),
241    DupsUniqueIds2 = [Id || #dcp_doc{id = Id} <- DupsUnique2],
242    MutationsIds2 = [Id || #dcp_doc{id = Id} <- Mutations3],
243    etap:is(DupsUniqueIds2, MutationsIds2,
244        "received the expected mutations when de-duplicated"),
245    couch_dcp_fake_server:set_items_per_snapshot(0),
246    couch_dcp_fake_server:set_dups_per_snapshot(0),
247
248
249    % Test multiple streams in parallel
250    {StreamReq0, {failoverlog, InitialFailoverLog0}} =
251        couch_dcp_client:add_stream(
252            Pid, 0, first_uuid(InitialFailoverLog0), 10, 100,
253            ?DCP_FLAG_NOFLAG),
254
255    {StreamReq1, {failoverlog, InitialFailoverLog1}} =
256        couch_dcp_client:add_stream(
257            Pid, 1, first_uuid(InitialFailoverLog1), 100, 200,
258            ?DCP_FLAG_NOFLAG),
259
260    {StreamReq2, {failoverlog, InitialFailoverLog2}} =
261        couch_dcp_client:add_stream(
262            Pid, 2, first_uuid(InitialFailoverLog2), 0, 10, ?DCP_FLAG_NOFLAG),
263
264    [MutationsPart0, MutationsPart1, MutationsPart2] = read_mutations(
265                    Pid, [StreamReq0, StreamReq1, StreamReq2], [[], [], []]),
266
267
268    etap:is(is_same_partition(0, MutationsPart0), true,
269        "Stream0 has only partition0 mutations"),
270    etap:is(is_same_partition(1, MutationsPart1), true,
271        "Stream1 has only partition1 mutations"),
272    etap:is(is_same_partition(2, MutationsPart2), true,
273        "Stream2 has only partition2 mutations"),
274
275    etap:is(length(MutationsPart0), 90,
276        "Stream0 has 90 mutations"),
277    etap:is(length(MutationsPart1), 100,
278        "Stream1 has 100 mutations"),
279    etap:is(length(MutationsPart2), 10,
280        "Stream2 has 10 mutations"),
281
282    {active_list_streams, StreamList0} = couch_dcp_client:list_streams(Pid),
283    etap:is(StreamList0, [], "Stream list is empty"),
284
285    couch_dcp_fake_server:pause_mutations(),
286    {StreamReq0_2, {failoverlog, InitialFailoverLog0}} =
287        couch_dcp_client:add_stream(
288            Pid, 0, first_uuid(InitialFailoverLog0), 1, 100, ?DCP_FLAG_NOFLAG),
289
290    {_, StreamResp0_3} =
291        couch_dcp_client:add_stream(
292            Pid, 0, first_uuid(InitialFailoverLog0), 10, 100,
293            ?DCP_FLAG_NOFLAG),
294    etap:is(StreamResp0_3, {error,vbucket_stream_already_exists},
295        "Stream for vbucket 0 already exists"),
296    couch_dcp_fake_server:continue_mutations(),
297
298    % Drain all mutations
299    read_mutations(Pid, [StreamReq0_2], [[]]),
300
301    couch_dcp_fake_server:pause_mutations(),
302    couch_dcp_client:add_stream(
303        Pid, 1, first_uuid(InitialFailoverLog1), 10, 300, ?DCP_FLAG_NOFLAG),
304
305    couch_dcp_client:add_stream(
306        Pid, 2, first_uuid(InitialFailoverLog2), 100, 200, ?DCP_FLAG_NOFLAG),
307
308    {active_list_streams, StreamList1} = couch_dcp_client:list_streams(Pid),
309    etap:is(StreamList1, [1,2], "Stream list contains parititon 1,2"),
310
311    StreamRemoveResp0 = couch_dcp_client:remove_stream(Pid, 1),
312    {active_list_streams, StreamList2} = couch_dcp_client:list_streams(Pid),
313    etap:is({StreamRemoveResp0, StreamList2}, {ok, [2]},
314        "Removed parititon stream 1 and parition stream 2 is left"),
315
316    StreamRemoveResp1 = couch_dcp_client:remove_stream(Pid, 1),
317    etap:is(StreamRemoveResp1, {error, vbucket_stream_not_found},
318        "Correct error on trying to remove non-existing stream"),
319    couch_dcp_fake_server:continue_mutations(),
320
321    % Test with too large failover log
322    TooLargeFailoverLog = [{I, I} ||
323        I <- lists:seq(0, ?DCP_MAX_FAILOVER_LOG_SIZE)],
324    PartId = 1,
325    couch_dcp_fake_server:set_failover_log(PartId, TooLargeFailoverLog),
326    TooLargeError = couch_dcp_client:enum_docs_since(
327          Pid, PartId, [{0, 0}], 0, 100, ?DCP_FLAG_NOFLAG, TestFun, [], AddStreamFun),
328    etap:is(TooLargeError, {error, too_large_failover_log},
329        "Too large failover log returns correct error"),
330
331    gen_server:call(Pid, reset_buffer_size),
332    % Remove existing streams
333    couch_dcp_client:remove_stream(Pid, 0),
334    couch_dcp_client:remove_stream(Pid, 1),
335    couch_dcp_client:remove_stream(Pid, 2),
336
337
338    % Tests for flow control
339    couch_dcp_fake_server:pause_mutations(),
340
341    {StreamReq0_4, _} = couch_dcp_client:add_stream(
342        Pid, 0, first_uuid(InitialFailoverLog0), 0, 500, ?DCP_FLAG_NOFLAG),
343
344    gen_server:call(Pid, reset_buffer_size),
345    Ret = couch_dcp_fake_server:is_control_req(),
346
347    etap:is(Ret, true, "Buffer control request sent"),
348
349    NumBufferAck = couch_dcp_fake_server:get_num_buffer_acks(),
350
351    % Fill the client buffer by asking server to send 1024 bytes data
352    try_until_throttled(Pid, StreamReq0_4, 1000, 1024),
353
354    % Consume 200 bytes from the client
355    try_until_unthrottled(Pid, StreamReq0_4, 0, 200),
356
357    NumBufferAck2 = couch_dcp_fake_server:get_num_buffer_acks(),
358    etap:is(NumBufferAck2, NumBufferAck, "Not sent the buffer ack"),
359
360    % Consume More data so that is greater then 20 % of 1024 i.e.204.
361    % when data is 20% consumed, client sends the buffer ack to increase
362    % the flow control buffer.
363    try_until_unthrottled(Pid, StreamReq0_4, 0, 210),
364    timer:sleep(500),
365    NumBufferAck3 = couch_dcp_fake_server:get_num_buffer_acks(),
366    etap:is(NumBufferAck3, NumBufferAck + 1, "Got the buffer ack"),
367    couch_dcp_client:remove_stream(Pid, 0),
368
369    couch_dcp_fake_server:pause_mutations(),
370    {StreamReq1_2, _} = couch_dcp_client:add_stream(
371        Pid, 1, first_uuid(InitialFailoverLog1), 0, 500, ?DCP_FLAG_NOFLAG),
372
373    ReqPid = spawn(fun() ->
374        couch_dcp_client:get_stream_event(Pid, StreamReq1_2)
375        end),
376
377    EvResponse = couch_dcp_client:get_stream_event(Pid, StreamReq1_2),
378    etap:is(EvResponse, {error, event_request_already_exists},
379        "Error message received when requested multiple times for same stream's event"),
380
381    exit(ReqPid, shutdown),
382    couch_dcp_client:remove_stream(Pid, 1),
383    gen_server:call(Pid, flush_old_streams_meta),
384
385    {StreamReq0_5, _} = couch_dcp_client:add_stream(Pid, 2,
386        first_uuid(InitialFailoverLog2), 0, 500, ?DCP_FLAG_NOFLAG),
387
388    % Connection close and reconnection tests
389    ok = couch_dcp_fake_server:send_single_mutation(),
390    {snapshot_mutation, Mutation1} = couch_dcp_client:get_stream_event(Pid, StreamReq0_5),
391    #dcp_doc{seq = SeqNo1} = Mutation1,
392    OldSocket = gen_server:call(Pid, get_socket),
393    ok = couch_dcp_fake_server:close_connection(2),
394    % wait for sometime to do reconnect from couch_dcp client
395    timer:sleep(4000),
396    NewSocket = gen_server:call(Pid, get_socket),
397    etap:isnt(OldSocket, NewSocket, "Socket changed"),
398    ok = couch_dcp_fake_server:send_single_mutation(),
399    ErrorResp = couch_dcp_client:get_stream_event(Pid, StreamReq0_5),
400    etap:is({error, dcp_conn_closed}, ErrorResp, "Got the error response after connection close"),
401    {active_list_streams, EmptyStreamList} = couch_dcp_client:list_streams(Pid),
402    etap:is([], EmptyStreamList, "Stream is correctly removed after connection close"),
403
404    ok = couch_dcp_fake_server:close_connection(nil),
405    % wait for sometime to do reconnect from couch_dcp client
406    timer:sleep(4000),
407    couch_dcp_client:remove_stream(Pid, 2),
408    couch_dcp_fake_server:continue_mutations(),
409
410    % Test get_num_items
411
412    {ok, NumItems0} = couch_dcp_client:get_num_items(Pid, 0),
413    etap:is(NumItems0, num_docs() div num_set_partitions(),
414        "Number of items of partition 0 is correct"),
415    {ok, NumItems1} = couch_dcp_client:get_num_items(Pid, 1),
416    etap:is(NumItems1, num_docs() div num_set_partitions(),
417        "Number of items of partition 1 is correct"),
418    {ok, NumItems2} = couch_dcp_client:get_num_items(Pid, 2),
419    etap:is(NumItems2, num_docs() div num_set_partitions(),
420         "Number of items of partition 2 is correct"),
421    {ok, NumItems3} = couch_dcp_client:get_num_items(Pid, 3),
422    etap:is(NumItems3, num_docs() div num_set_partitions(),
423        "Number of items of partition 3 is correct"),
424    NumItemsError = couch_dcp_client:get_num_items(Pid, 100000),
425    etap:is(NumItemsError, {error, not_my_vbucket},
426        "Too high partition number returns correct error"),
427
428    DocsPerPartition = num_docs() div num_set_partitions(),
429    NumDelDocs0 = num_docs() div (num_set_partitions() * 2),
430    delete_docs(1, NumDelDocs0),
431    {ok, NumItemsDel0} = couch_dcp_client:get_num_items(Pid, 0),
432    etap:is(NumItemsDel0, DocsPerPartition - NumItemsDel0,
433        "Number of items of partition 0 after some deletions is correct"),
434    NumDelDocs3 = num_docs() div (num_set_partitions() * 4),
435    delete_docs(3 * DocsPerPartition + 1, NumDelDocs3),
436    {ok, NumItemsDel3} = couch_dcp_client:get_num_items(Pid, 3),
437    etap:is(NumItemsDel3, DocsPerPartition - NumDelDocs3,
438        "Number of items of partition 3 after some deletions is correct"),
439
440    % Tests for requesting persisted items only
441
442    couch_dcp_fake_server:set_persisted_items_fun(fun(Seq) -> Seq  end),
443    {ok, [{1, HighSeq1}]} = couch_dcp_client:get_seqs(Pid, [1]),
444    {ok, ExpectedDocs1, _} =
445        couch_dcp_client:enum_docs_since(
446            Pid, 0, InitialFailoverLog0, 0, HighSeq1, ?DCP_FLAG_NOFLAG,
447            TestFun, [], AddStreamFun),
448    {ok, PersistedDocs1, _} =
449        couch_dcp_client:enum_docs_since(
450            Pid, 0, InitialFailoverLog0, 0, HighSeq1, ?DCP_FLAG_DISKONLY,
451            TestFun, [], AddStreamFun),
452    etap:is(PersistedDocs1, ExpectedDocs1,
453        "The persisted sequence number is correct, seq"),
454
455    couch_dcp_fake_server:set_persisted_items_fun(
456       fun(Seq) -> Seq div 2 end),
457    {ok, [{1, HighSeq2}]} = couch_dcp_client:get_seqs(Pid, [1]),
458    {ok, ExpectedDocs2, _} =
459        couch_dcp_client:enum_docs_since(
460            Pid, 0, InitialFailoverLog0, 0, HighSeq2 div 2, ?DCP_FLAG_NOFLAG,
461            TestFun, [], AddStreamFun),
462    {ok, PersistedDocs2, _} =
463        couch_dcp_client:enum_docs_since(
464            Pid, 0, InitialFailoverLog0, 0, HighSeq2, ?DCP_FLAG_DISKONLY,
465            TestFun, [], AddStreamFun),
466    etap:is(PersistedDocs2, ExpectedDocs2,
467        "The persisted sequence number is correct, seq/2"),
468
469    couch_dcp_fake_server:set_persisted_items_fun(fun(Seq) -> Seq - 1 end),
470    {ok, [{1, HighSeq3}]} = couch_dcp_client:get_seqs(Pid, [1]),
471    {ok, ExpectedDocs3, _} =
472        couch_dcp_client:enum_docs_since(
473            Pid, 0, InitialFailoverLog0, 0, HighSeq3 - 1, ?DCP_FLAG_NOFLAG,
474            TestFun, [], AddStreamFun),
475    {ok, PersistedDocs3, _} =
476        couch_dcp_client:enum_docs_since(
477            Pid, 0, InitialFailoverLog0, 0, HighSeq3, ?DCP_FLAG_DISKONLY,
478            TestFun, [], AddStreamFun),
479    etap:is(PersistedDocs3, ExpectedDocs3,
480        "The persisted sequence number is correct, seq - 1"),
481    ok.
482
483% Test robustness when the connection is dropped during a get all sequence
484% numbers or stats request
485% This is a regression test for MB-15922 and MB-17026
486test_close_during_request() ->
487    %timer:sleep(1),
488    %couch_set_view_test_util:start_server(test_set_name()),
489    %setup_test(),
490
491    {auth, User, Passwd} = cb_auth_info:get(),
492    {ok, Pid} = couch_dcp_client:start(
493        test_set_name(), test_set_name(), User, Passwd, 1024, 0),
494
495    ParentPid = self(),
496
497    % Get sequencenumbers request
498    spawn(fun() ->
499        couch_dcp_fake_server:close_on_next(),
500        Closed = couch_dcp_client:get_seqs(Pid, [0]),
501        ParentPid ! {ok, Closed}
502    end),
503    receive
504    {ok, Closed} ->
505        etap:is(Closed, {error, dcp_conn_closed},
506            "The connection got (as expected) closed "
507            "during the get sequence numbers request")
508    after 10000 ->
509         etap:bail("Cannot get sequence number on time, the DCP client hangs")
510    end,
511
512    % Stats request
513    spawn(fun() ->
514        couch_dcp_fake_server:close_on_next(),
515        catch couch_dcp_client:get_num_items(Pid, 0),
516        ParentPid ! ok
517    end),
518    receive
519    ok ->
520        etap:ok(true,
521            "The connection got (as expected) closed during the stats request")
522    after 10000 ->
523         etap:bail("Cannot get stats on time, the DCP client hangs")
524    end,
525    ok.
526
527
528try_until_throttled(Pid, ReqId, N, MaxSize) when N > 0 ->
529    ok = couch_dcp_fake_server:send_single_mutation(),
530    timer:sleep(1),
531    Size2 = gen_server:call(Pid, {get_buffer_size, ReqId}),
532    if 
533    MaxSize > Size2 ->
534        try_until_throttled(Pid, ReqId, N - 1, MaxSize);
535    true ->
536        ok
537    end.
538
539try_until_unthrottled(Pid, ReqId, Size, MaxSize) ->
540    Data = couch_dcp_client:get_stream_event(Pid, ReqId),
541    Size2 = Size + get_event_size(Data), 
542    if
543    Size2 < MaxSize ->
544        try_until_unthrottled(Pid, ReqId, Size2, MaxSize);
545    true ->
546        ok
547    end.
548
549setup_test() ->
550    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
551    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
552    populate_set().
553
554is_same_partition(PartId, Docs) ->
555    lists:all(fun({_, #dcp_doc{partition = P}}) ->
556        P =:= PartId
557    end, Docs).
558
559read_streams(Pid, StreamReqs) ->
560    lists:foldr(fun(Request, Acc) ->
561        case couch_dcp_client:get_stream_event(Pid, Request) of
562        {error, vbucket_stream_not_found} ->
563            [drained | Acc];
564        {Optype, _} = Item ->
565            case Optype of
566            stream_end ->
567                [[] | Acc];
568            snapshot_marker ->
569                [[] | Acc];
570            _ ->
571                [[Item] | Acc]
572            end;
573        _ ->
574            [[] | Acc]
575        end
576    end, [], StreamReqs).
577
578read_mutations(Pid, StreamReqs, Acc) ->
579    Items = read_streams(Pid, StreamReqs),
580    case lists:all(fun(Item) -> Item =:= drained end, Items) of
581    true ->
582        Acc;
583    false ->
584        Acc2 = lists:zipwith(fun(Item, Acc0) ->
585            case Item of
586            drained ->
587                Acc0;
588            _ ->
589                Item ++ Acc0
590            end
591        end, Items, Acc),
592        read_mutations(Pid, StreamReqs, Acc2)
593    end.
594
595doc_id(I) ->
596    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
597
598create_docs(From, To) ->
599    lists:map(
600        fun(I) ->
601            Cas = I,
602            ExpireTime = 0,
603            Flags = 0,
604            RevMeta1 = <<Cas:64/native, ExpireTime:32/native, Flags:32/native>>,
605            RevMeta2 = [[io_lib:format("~2.16.0b",[X]) || <<X:8>> <= RevMeta1 ]],
606            RevMeta3 = iolist_to_binary(RevMeta2),
607            {[
608              {<<"meta">>, {[
609                             {<<"id">>, doc_id(I)},
610                             {<<"rev">>, <<"1-", RevMeta3/binary>>}
611                            ]}},
612              {<<"json">>, {[{<<"value">>, I}]}}
613            ]}
614        end,
615        lists:seq(From, To)).
616
617populate_set() ->
618    etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
619        " databases with " ++ integer_to_list(num_docs()) ++ " documents"),
620    DocList = create_docs(1, num_docs()),
621    ok = couch_set_view_test_util:populate_set_sequentially(
622        test_set_name(),
623        lists:seq(0, num_set_partitions() - 1),
624        DocList).
625
626delete_docs(StartId, NumDocs) ->
627    Dbs = dict:from_list(lists:map(
628        fun(I) ->
629            {ok, Db} = couch_set_view_test_util:open_set_db(
630                test_set_name(), I),
631            {I, Db}
632        end,
633        lists:seq(0, num_set_partitions() - 1))),
634    Docs = lists:foldl(
635        fun(I, Acc) ->
636            Doc = couch_doc:from_json_obj({[
637                {<<"meta">>, {[{<<"deleted">>, true}, {<<"id">>, doc_id(I)}]}},
638                {<<"json">>, {[]}}
639            ]}),
640            DocsPerPartition = num_docs() div num_set_partitions(),
641            DocList = case orddict:find(I div DocsPerPartition, Acc) of
642            {ok, L} ->
643                L;
644            error ->
645                []
646            end,
647            orddict:store(I div DocsPerPartition, [Doc | DocList], Acc)
648        end,
649        orddict:new(), lists:seq(StartId, StartId + NumDocs - 1)),
650    [] = orddict:fold(
651        fun(I, DocList, Acc) ->
652            Db = dict:fetch(I, Dbs),
653            etap:diag("Deleting " ++ integer_to_list(length(DocList)) ++
654                " documents from partition " ++ integer_to_list(I)),
655            ok = couch_db:update_docs(Db, DocList, [sort_docs]),
656            Acc
657        end,
658        [], Docs),
659    ok = lists:foreach(fun({_, Db}) ->
660        ok = couch_db:close(Db)
661    end, dict:to_list(Dbs)).
662
663
664first_uuid(FailoverLog) ->
665    element(1, hd(FailoverLog)).
666
667receive_mutation(0, _, _) ->
668    ok;
669receive_mutation(Count, Pid, Stream) ->
670    couch_dcp_fake_server:send_single_mutation(),
671    couch_dcp_client:get_stream_event(Pid, Stream),
672    receive_mutation(Count - 1, Pid, Stream).
673
674get_event_size({Type, Doc}) ->
675    case Type of
676        snapshot_mutation ->
677            #dcp_doc {
678                id = Key,
679                body = Value
680            } = Doc,
681            ?DCP_MSG_SIZE_MUTATION + erlang:external_size(Key) + erlang:external_size(Value);
682        stream_end ->
683            ?DCP_MSG_SIZE_STREAM_END;
684        snapshot_marker ->
685            ?DCP_MSG_SIZE_SNAPSHOT;
686        snapshot_deletion ->
687            #dcp_doc {
688                id = Key
689            } = Doc,
690           ?DCP_MSG_SIZE_DELETION + erlang:external_size(Key)
691    end.
692