1#!/usr/bin/env escript
2%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_set_view/include/couch_set_view.hrl").
18
19
20test_set_name() -> <<"couch_test_dcp_view_groups">>.
21num_set_partitions() -> 4.
22ddoc_id() -> <<"_design/test">>.
23num_docs() -> 1024.  % keep it a multiple of num_set_partitions()
24num_docs_pp() -> 1024 div num_set_partitions().
25
26
27main(_) ->
28    test_util:init_code_path(),
29
30    etap:plan(16),
31    case (catch test()) of
32        ok ->
33            etap:end_tests();
34        Other ->
35            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
36            etap:bail(Other)
37    end,
38    %init:stop(),
39    %receive after infinity -> ok end,
40    ok.
41
42
43test() ->
44    couch_set_view_test_util:start_server(test_set_name()),
45    etap:diag("Testing DCP in regards to view groups"),
46
47    test_partition_versions_update(),
48    test_rollback_different_heads(),
49    test_persisted_items(),
50    test_mutliple_snapshots(),
51    test_duplicates(),
52
53    couch_set_view_test_util:stop_server(),
54    ok.
55
56test_partition_versions_update() ->
57    etap:diag("Testing whether the view partition versions are updated or not"),
58
59    setup_test(),
60    {auth, User, Passwd} = cb_auth_info:get(),
61    {ok, Pid} = couch_dcp_client:start(
62            test_set_name(), test_set_name(), User, Passwd, 20*1024*1024, 0),
63
64    {ok, InitialFailoverLog1} = couch_dcp_client:get_failover_log(Pid, 1),
65    {ok, InitialFailoverLog2} = couch_dcp_client:get_failover_log(Pid, 2),
66    {ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
67        test_set_name(), ddoc_id(), <<"test">>, []),
68
69    GroupFailoverLog1 = get_group_failover_log(1),
70    GroupFailoverLog2 = get_group_failover_log(2),
71    etap:is(GroupFailoverLog1, InitialFailoverLog1,
72        "Group failover log of partition 1 is the same as "
73        "initial failover log"),
74    etap:is(GroupFailoverLog2, InitialFailoverLog2,
75        "Group failover log of partition 2 is the same as "
76        "initial failover log"),
77
78    FailoverLog2 = InitialFailoverLog2 ++ [{222331, 10}],
79    couch_dcp_fake_server:set_failover_log(2, FailoverLog2),
80    % Insert new docs so that the updater is run on the new query
81    populate_set(num_docs() + 1, 2 * num_docs()),
82    {ok, {_ViewResults2}} = couch_set_view_test_util:query_view(
83        test_set_name(), ddoc_id(), <<"test">>, []),
84
85    GroupFailoverLog1b = get_group_failover_log(1),
86    GroupFailoverLog2b = get_group_failover_log(2),
87    etap:is(GroupFailoverLog1b, InitialFailoverLog1,
88        "Group failover log of partition 1 is still the same as "
89        "initial failover log"),
90    etap:is(GroupFailoverLog2b, FailoverLog2,
91        "Group failover log of partition 2 got correctly updated"),
92
93    shutdown_group().
94
95
96test_rollback_different_heads() ->
97    % The testcase is: server and client have a shared history. The most
98    % recent failover log entry differs. The most recent entry from the server
99    % has a lower high squence number than the client has. The client needs
100    % to retry with an older version of its failover log. Then a rollback
101    % should happen. And finally the indexing should catch up again.
102    etap:diag("Testing a rollback where the server and the client have "
103        "a common history except for the most recent one, where both differ"),
104
105    % Give the DCP server a failover log we can diverge from
106    FailoverLog = [
107        {10001, (num_docs_pp() * 2)},
108        {10002, num_docs_pp()},
109        {10003, 0}],
110
111    {ViewResultNoRollback, FailoverLogNoRollback} = rollback_different_heads(
112        dont_force_a_rollback, FailoverLog),
113    {ViewResultRollback, FailoverLogRollback} = rollback_different_heads(
114        force_a_rollback, FailoverLog),
115    etap:is(ViewResultRollback, ViewResultNoRollback,
116        "View results are the same with and without a rollback"),
117    etap:isnt(FailoverLogRollback, FailoverLogNoRollback,
118        "The failover log is different between the two runs"),
119    ok.
120
121rollback_different_heads(DoRollback, FailoverLog) ->
122    Msg = case DoRollback of
123    dont_force_a_rollback ->
124        "Query data without rollback";
125    force_a_rollback ->
126        "Query data with rollback"
127    end,
128    etap:diag(Msg),
129
130    setup_test(),
131    PartId = 1,
132    couch_dcp_fake_server:set_failover_log(PartId, FailoverLog),
133
134    % Update index twice, so that there are header to roll back to
135    {ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
136        test_set_name(), ddoc_id(), <<"test">>, []),
137    populate_set(num_docs() + 1, 2 * num_docs()),
138    {ok, {_ViewResults2}} = couch_set_view_test_util:query_view(
139        test_set_name(), ddoc_id(), <<"test">>, []),
140    GroupFailoverLog = get_group_failover_log(PartId),
141    etap:is(GroupFailoverLog, FailoverLog,
142        "Group has initially the correct failover log"),
143
144    case DoRollback of
145    dont_force_a_rollback ->
146        FailoverLog2 = FailoverLog;
147    force_a_rollback ->
148        % Change the failover log on the server that is different from what
149        % The client has, so that a rollback is needed
150        FailoverLog2 = [{777888999, num_docs_pp() + 10}] ++
151            tl(FailoverLog),
152        couch_dcp_fake_server:set_failover_log(PartId, FailoverLog2)
153    end,
154
155    % Insert new docs so that the updater is run on the new query
156    populate_set((num_docs() * 2) + 1, 3 * num_docs()),
157    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
158        test_set_name(), ddoc_id(), <<"test">>, []),
159    GroupFailoverLog2 = get_group_failover_log(PartId),
160    etap:is(GroupFailoverLog2, FailoverLog2,
161        "Group has correct failover log after it might have changed"),
162
163    shutdown_group(),
164    {ViewResults3, FailoverLog2}.
165
166
167test_persisted_items() ->
168    etap:diag("Test the initial index build with a persisted sequence number "
169        "which is lower than the current high sequence"),
170
171    % First query with persisted sequence number == high sequence number
172    setup_test(),
173    couch_dcp_fake_server:set_persisted_items_fun(fun(Seq) -> Seq end),
174    {ok, {ViewResults1}} = couch_set_view_test_util:query_view(
175        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
176    shutdown_group(),
177
178    % Then with persisted sequence number == high sequence number / 2
179    setup_test(),
180    couch_dcp_fake_server:set_persisted_items_fun(
181        fun(Seq) -> max(Seq div 2, 1) end),
182    {ok, {ViewResults2}} = couch_set_view_test_util:query_view(
183        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
184    etap:is(ViewResults1, ViewResults2,
185        "Having a persisted sequence number lower than the high sequence "
186        "number doesn't make difference (a)"),
187    shutdown_group(),
188
189    % Then with persisted sequence number == high sequence number - 1
190    setup_test(),
191    couch_dcp_fake_server:set_persisted_items_fun(
192        fun(Seq) -> max(Seq - 1, 1) end),
193    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
194        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
195    etap:is(ViewResults1, ViewResults3,
196        "Having a persisted sequence number lower than the high sequence "
197        "number doesn't make difference (b)"),
198    shutdown_group(),
199    ok.
200
201
202test_mutliple_snapshots() ->
203    etap:diag("Test the index build with receiving several snapshots"),
204
205    % First query with the result returning in a single snapshot
206    setup_test(),
207    couch_dcp_fake_server:set_items_per_snapshot(0),
208    {ok, {ViewResults1}} = couch_set_view_test_util:query_view(
209        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
210    shutdown_group(),
211
212    % Then with the result returning in several snapshots
213    setup_test(),
214    couch_dcp_fake_server:set_items_per_snapshot(num_docs_pp() div 4),
215    {ok, {ViewResults2}} = couch_set_view_test_util:query_view(
216        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
217    etap:is(ViewResults1, ViewResults2,
218        "The results of the single snapshot is the same as with multiple "
219        "snapshots (a)"),
220    shutdown_group(),
221
222    % Try again with some other number of snapshots
223    setup_test(),
224    couch_dcp_fake_server:set_items_per_snapshot(num_docs_pp() div 3),
225    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
226        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
227    etap:is(ViewResults1, ViewResults3,
228        "The results of the single snapshot is the same as with multiple "
229        "snapshots (b)"),
230    shutdown_group(),
231    ok.
232
233
234test_duplicates() ->
235    etap:diag("Test the index build with receiving duplicates within several "
236      "snapshots"),
237
238    % First query with the result returning in a single snapshot
239    setup_test(),
240    couch_dcp_fake_server:set_items_per_snapshot(0),
241    {ok, {ViewResults1}} = couch_set_view_test_util:query_view(
242        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
243    shutdown_group(),
244
245    % Then with the result where the stream contains duplicates
246    setup_test(),
247    couch_dcp_fake_server:set_items_per_snapshot(num_docs_pp() div 4),
248    couch_dcp_fake_server:set_dups_per_snapshot(num_docs_pp() div 9),
249    {ok, {ViewResults2}} = couch_set_view_test_util:query_view(
250        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
251    etap:is(ViewResults2, ViewResults1,
252        "The results of the single snapshot is the same as with multiple "
253        "snapshots containing duplicates (a)"),
254    shutdown_group(),
255
256    % Try again with some other number of duplicates
257    setup_test(),
258    couch_dcp_fake_server:set_items_per_snapshot(num_docs_pp() div 3),
259    couch_dcp_fake_server:set_dups_per_snapshot(num_docs_pp() div 10),
260    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
261        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
262    etap:is(ViewResults3, ViewResults1,
263        "The results of the single snapshot is the same as with multiple "
264        "snapshots containing duplicates (b)"),
265    shutdown_group(),
266    ok.
267
268
269setup_test() ->
270    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
271    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
272    populate_set(1, num_docs()),
273
274    DDoc = {[
275        {<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
276        {<<"json">>, {[
277            {<<"views">>, {[
278                {<<"test">>, {[
279                    {<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>}
280                ]}}
281            ]}}
282        ]}}
283    ]},
284    ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
285    ok = configure_view_group().
286
287shutdown_group() ->
288    couch_dcp_fake_server:reset(),
289    GroupPid = couch_set_view:get_group_pid(
290        mapreduce_view, test_set_name(), ddoc_id(), prod),
291    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
292    MonRef = erlang:monitor(process, GroupPid),
293    receive
294    {'DOWN', MonRef, _, _, _} ->
295        ok
296    after 10000 ->
297        etap:bail("Timeout waiting for group shutdown")
298    end.
299
300
301populate_set(From, To) ->
302    etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
303        " databases with " ++ integer_to_list(num_docs()) ++ " documents"),
304    DocList = create_docs(From, To),
305    ok = couch_set_view_test_util:populate_set_sequentially(
306        test_set_name(),
307        lists:seq(0, num_set_partitions() - 1),
308        DocList).
309
310doc_id(I) ->
311    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
312
313create_docs(From, To) ->
314    lists:map(
315        fun(I) ->
316            Cas = I,
317            ExpireTime = 0,
318            Flags = 0,
319            RevMeta1 = <<Cas:64/native, ExpireTime:32/native, Flags:32/native>>,
320            RevMeta2 = [[io_lib:format("~2.16.0b",[X]) || <<X:8>> <= RevMeta1 ]],
321            RevMeta3 = iolist_to_binary(RevMeta2),
322            {[
323              {<<"meta">>, {[
324                             {<<"id">>, doc_id(I)},
325                             {<<"rev">>, <<"1-", RevMeta3/binary>>}
326                            ]}},
327              {<<"json">>, {[{<<"value">>, I}]}}
328            ]}
329        end,
330        lists:seq(From, To)).
331
332
333configure_view_group() ->
334    etap:diag("Configuring view group"),
335    Params = #set_view_params{
336        max_partitions = num_set_partitions(),
337        active_partitions = lists:seq(0, num_set_partitions()-1),
338        passive_partitions = [],
339        use_replica_index = false
340    },
341    try
342        couch_set_view:define_group(
343            mapreduce_view, test_set_name(), ddoc_id(), Params)
344    catch _:Error ->
345        Error
346    end.
347
348
349get_group_info() ->
350    GroupPid = couch_set_view:get_group_pid(
351        mapreduce_view, test_set_name(), ddoc_id(), prod),
352    {ok, GroupInfo} = couch_set_view_group:request_group_info(GroupPid),
353    GroupInfo.
354
355get_group_failover_log(PartId) ->
356    GroupInfo = get_group_info(),
357    {partition_versions, {PartVersions0}} = lists:keyfind(
358        partition_versions, 1, GroupInfo),
359    PartVersions = lists:map(fun({PartId0, PartVersion}) ->
360        {list_to_integer(binary_to_list(PartId0)),
361            [list_to_tuple(V) || V <- PartVersion]}
362    end, PartVersions0),
363    {PartId, FailoverLog} = lists:keyfind(PartId, 1, PartVersions),
364    FailoverLog.
365