1#!/usr/bin/env escript
2%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_set_view/include/couch_set_view.hrl").
18
19
20test_set_name() -> <<"couch_test_dcp_view_groups">>.
21num_set_partitions() -> 4.
22ddoc_id() -> <<"_design/test">>.
23num_docs() -> 1024.  % keep it a multiple of num_set_partitions()
24num_docs_pp() -> 1024 div num_set_partitions().
25
26
27main(_) ->
28    test_util:init_code_path(),
29
30    etap:plan(16),
31    case (catch test()) of
32        ok ->
33            etap:end_tests();
34        Other ->
35            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
36            etap:bail(Other)
37    end,
38    ok.
39
40
41test() ->
42    couch_set_view_test_util:start_server(test_set_name()),
43    etap:diag("Testing DCP in regards to view groups"),
44
45    test_partition_versions_update(),
46    test_rollback_different_heads(),
47    test_persisted_items(),
48    test_mutliple_snapshots(),
49    test_duplicates(),
50
51    couch_set_view_test_util:stop_server(),
52    ok.
53
54test_partition_versions_update() ->
55    etap:diag("Testing whether the view partition versions are updated or not"),
56
57    setup_test(),
58    {auth, User, Passwd} = cb_auth_info:get(),
59    {ok, Pid} = couch_dcp_client:start(
60            test_set_name(), test_set_name(), User, Passwd, 20*1024*1024, 0),
61
62    {ok, InitialFailoverLog1} = couch_dcp_client:get_failover_log(Pid, 1),
63    {ok, InitialFailoverLog2} = couch_dcp_client:get_failover_log(Pid, 2),
64    {ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
65        test_set_name(), ddoc_id(), <<"test">>, []),
66
67    GroupFailoverLog1 = get_group_failover_log(1),
68    GroupFailoverLog2 = get_group_failover_log(2),
69    etap:is(GroupFailoverLog1, InitialFailoverLog1,
70        "Group failover log of partition 1 is the same as "
71        "initial failover log"),
72    etap:is(GroupFailoverLog2, InitialFailoverLog2,
73        "Group failover log of partition 2 is the same as "
74        "initial failover log"),
75
76    FailoverLog2 = InitialFailoverLog2 ++ [{222331, 10}],
77    couch_dcp_fake_server:set_failover_log(2, FailoverLog2),
78    % Insert new docs so that the updater is run on the new query
79    populate_set(num_docs() + 1, 2 * num_docs()),
80    {ok, {_ViewResults2}} = couch_set_view_test_util:query_view(
81        test_set_name(), ddoc_id(), <<"test">>, []),
82
83    GroupFailoverLog1b = get_group_failover_log(1),
84    GroupFailoverLog2b = get_group_failover_log(2),
85    etap:is(GroupFailoverLog1b, InitialFailoverLog1,
86        "Group failover log of partition 1 is still the same as "
87        "initial failover log"),
88    etap:is(GroupFailoverLog2b, FailoverLog2,
89        "Group failover log of partition 2 got correctly updated"),
90
91    shutdown_group().
92
93
94test_rollback_different_heads() ->
95    % The testcase is: server and client have a shared history. The most
96    % recent failover log entry differs. The most recent entry from the server
97    % has a lower high squence number than the client has. The client needs
98    % to retry with an older version of its failover log. Then a rollback
99    % should happen. And finally the indexing should catch up again.
100    etap:diag("Testing a rollback where the server and the client have "
101        "a common history except for the most recent one, where both differ"),
102
103    % Give the DCP server a failover log we can diverge from
104    FailoverLog = [
105        {10001, (num_docs_pp() * 2)},
106        {10002, num_docs_pp()},
107        {10003, 0}],
108
109    {ViewResultNoRollback, FailoverLogNoRollback} = rollback_different_heads(
110        dont_force_a_rollback, FailoverLog),
111    {ViewResultRollback, FailoverLogRollback} = rollback_different_heads(
112        force_a_rollback, FailoverLog),
113    etap:is(ViewResultRollback, ViewResultNoRollback,
114        "View results are the same with and without a rollback"),
115    etap:isnt(FailoverLogRollback, FailoverLogNoRollback,
116        "The failover log is different between the two runs"),
117    ok.
118
119rollback_different_heads(DoRollback, FailoverLog) ->
120    Msg = case DoRollback of
121    dont_force_a_rollback ->
122        "Query data without rollback";
123    force_a_rollback ->
124        "Query data with rollback"
125    end,
126    etap:diag(Msg),
127
128    setup_test(),
129    PartId = 1,
130    couch_dcp_fake_server:set_failover_log(PartId, FailoverLog),
131
132    % Update index twice, so that there are header to roll back to
133    {ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
134        test_set_name(), ddoc_id(), <<"test">>, []),
135    populate_set(num_docs() + 1, 2 * num_docs()),
136    {ok, {_ViewResults2}} = couch_set_view_test_util:query_view(
137        test_set_name(), ddoc_id(), <<"test">>, []),
138    GroupFailoverLog = get_group_failover_log(PartId),
139    etap:is(GroupFailoverLog, FailoverLog,
140        "Group has initially the correct failover log"),
141
142    case DoRollback of
143    dont_force_a_rollback ->
144        FailoverLog2 = FailoverLog;
145    force_a_rollback ->
146        % Change the failover log on the server that is different from what
147        % The client has, so that a rollback is needed
148        FailoverLog2 = [{777888999, num_docs_pp() + 10}] ++
149            tl(FailoverLog),
150        couch_dcp_fake_server:set_failover_log(PartId, FailoverLog2)
151    end,
152
153    % Insert new docs so that the updater is run on the new query
154    populate_set((num_docs() * 2) + 1, 3 * num_docs()),
155    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
156        test_set_name(), ddoc_id(), <<"test">>, []),
157    GroupFailoverLog2 = get_group_failover_log(PartId),
158    etap:is(GroupFailoverLog2, FailoverLog2,
159        "Group has correct failover log after it might have changed"),
160
161    shutdown_group(),
162    {ViewResults3, FailoverLog2}.
163
164
165test_persisted_items() ->
166    etap:diag("Test the initial index build with a persisted sequence number "
167        "which is lower than the current high sequence"),
168
169    % First query with persisted sequence number == high sequence number
170    setup_test(),
171    couch_dcp_fake_server:set_persisted_items_fun(fun(Seq) -> Seq end),
172    {ok, {ViewResults1}} = couch_set_view_test_util:query_view(
173        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
174    shutdown_group(),
175
176    % Then with persisted sequence number == high sequence number / 2
177    setup_test(),
178    couch_dcp_fake_server:set_persisted_items_fun(
179        fun(Seq) -> max(Seq div 2, 1) end),
180    {ok, {ViewResults2}} = couch_set_view_test_util:query_view(
181        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
182    etap:is(ViewResults1, ViewResults2,
183        "Having a persisted sequence number lower than the high sequence "
184        "number doesn't make difference (a)"),
185    shutdown_group(),
186
187    % Then with persisted sequence number == high sequence number - 1
188    setup_test(),
189    couch_dcp_fake_server:set_persisted_items_fun(
190        fun(Seq) -> max(Seq - 1, 1) end),
191    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
192        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
193    etap:is(ViewResults1, ViewResults3,
194        "Having a persisted sequence number lower than the high sequence "
195        "number doesn't make difference (b)"),
196    shutdown_group(),
197    ok.
198
199
200test_mutliple_snapshots() ->
201    etap:diag("Test the index build with receiving several snapshots"),
202
203    % First query with the result returning in a single snapshot
204    setup_test(),
205    couch_dcp_fake_server:set_items_per_snapshot(0),
206    {ok, {ViewResults1}} = couch_set_view_test_util:query_view(
207        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
208    shutdown_group(),
209
210    % Then with the result returning in several snapshots
211    setup_test(),
212    couch_dcp_fake_server:set_items_per_snapshot(num_docs_pp() div 4),
213    {ok, {ViewResults2}} = couch_set_view_test_util:query_view(
214        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
215    etap:is(ViewResults1, ViewResults2,
216        "The results of the single snapshot is the same as with multiple "
217        "snapshots (a)"),
218    shutdown_group(),
219
220    % Try again with some other number of snapshots
221    setup_test(),
222    couch_dcp_fake_server:set_items_per_snapshot(num_docs_pp() div 3),
223    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
224        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
225    etap:is(ViewResults1, ViewResults3,
226        "The results of the single snapshot is the same as with multiple "
227        "snapshots (b)"),
228    shutdown_group(),
229    ok.
230
231
232test_duplicates() ->
233    etap:diag("Test the index build with receiving duplicates within several "
234      "snapshots"),
235
236    % First query with the result returning in a single snapshot
237    setup_test(),
238    couch_dcp_fake_server:set_items_per_snapshot(0),
239    {ok, {ViewResults1}} = couch_set_view_test_util:query_view(
240        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
241    shutdown_group(),
242
243    % Then with the result where the stream contains duplicates
244    setup_test(),
245    couch_dcp_fake_server:set_items_per_snapshot(num_docs_pp() div 4),
246    couch_dcp_fake_server:set_dups_per_snapshot(num_docs_pp() div 9),
247    {ok, {ViewResults2}} = couch_set_view_test_util:query_view(
248        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
249    etap:is(ViewResults2, ViewResults1,
250        "The results of the single snapshot is the same as with multiple "
251        "snapshots containing duplicates (a)"),
252    shutdown_group(),
253
254    % Try again with some other number of duplicates
255    setup_test(),
256    couch_dcp_fake_server:set_items_per_snapshot(num_docs_pp() div 3),
257    couch_dcp_fake_server:set_dups_per_snapshot(num_docs_pp() div 10),
258    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
259        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
260    etap:is(ViewResults3, ViewResults1,
261        "The results of the single snapshot is the same as with multiple "
262        "snapshots containing duplicates (b)"),
263    shutdown_group(),
264    ok.
265
266
267setup_test() ->
268    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
269    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
270    populate_set(1, num_docs()),
271
272    DDoc = {[
273        {<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
274        {<<"json">>, {[
275            {<<"views">>, {[
276                {<<"test">>, {[
277                    {<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>}
278                ]}}
279            ]}}
280        ]}}
281    ]},
282    ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
283    ok = configure_view_group().
284
285shutdown_group() ->
286    couch_dcp_fake_server:reset(),
287    GroupPid = couch_set_view:get_group_pid(
288        mapreduce_view, test_set_name(), ddoc_id(), prod),
289    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
290    MonRef = erlang:monitor(process, GroupPid),
291    receive
292    {'DOWN', MonRef, _, _, _} ->
293        ok
294    after 10000 ->
295        etap:bail("Timeout waiting for group shutdown")
296    end.
297
298
299populate_set(From, To) ->
300    etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
301        " databases with " ++ integer_to_list(num_docs()) ++ " documents"),
302    DocList = create_docs(From, To),
303    ok = couch_set_view_test_util:populate_set_sequentially(
304        test_set_name(),
305        lists:seq(0, num_set_partitions() - 1),
306        DocList).
307
308doc_id(I) ->
309    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
310
311create_docs(From, To) ->
312    lists:map(
313        fun(I) ->
314            Cas = I,
315            ExpireTime = 0,
316            Flags = 0,
317            RevMeta1 = <<Cas:64/native, ExpireTime:32/native, Flags:32/native>>,
318            RevMeta2 = [[io_lib:format("~2.16.0b",[X]) || <<X:8>> <= RevMeta1 ]],
319            RevMeta3 = iolist_to_binary(RevMeta2),
320            {[
321              {<<"meta">>, {[
322                             {<<"id">>, doc_id(I)},
323                             {<<"rev">>, <<"1-", RevMeta3/binary>>}
324                            ]}},
325              {<<"json">>, {[{<<"value">>, I}]}}
326            ]}
327        end,
328        lists:seq(From, To)).
329
330
331configure_view_group() ->
332    etap:diag("Configuring view group"),
333    Params = #set_view_params{
334        max_partitions = num_set_partitions(),
335        active_partitions = lists:seq(0, num_set_partitions()-1),
336        passive_partitions = [],
337        use_replica_index = false
338    },
339    try
340        couch_set_view:define_group(
341            mapreduce_view, test_set_name(), ddoc_id(), Params)
342    catch _:Error ->
343        Error
344    end.
345
346
347get_group_info() ->
348    GroupPid = couch_set_view:get_group_pid(
349        mapreduce_view, test_set_name(), ddoc_id(), prod),
350    {ok, GroupInfo} = couch_set_view_group:request_group_info(GroupPid),
351    GroupInfo.
352
353get_group_failover_log(PartId) ->
354    GroupInfo = get_group_info(),
355    {partition_versions, {PartVersions0}} = lists:keyfind(
356        partition_versions, 1, GroupInfo),
357    PartVersions = lists:map(fun({PartId0, PartVersion}) ->
358        {list_to_integer(binary_to_list(PartId0)),
359            [list_to_tuple(V) || V <- PartVersion]}
360    end, PartVersions0),
361    {PartId, FailoverLog} = lists:keyfind(PartId, 1, PartVersions),
362    FailoverLog.
363