1#!/usr/bin/env escript
2%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_set_view/include/couch_set_view.hrl").
18-define(MAX_WAIT_TIME, 10 * 1000).
19
20test_set_name() -> <<"couch_test_set_index_rollback">>.
21num_set_partitions() -> 4.
22ddoc_id() -> <<"_design/test">>.
23num_docs() -> 128.  % keep it a multiple of num_set_partitions()
24
25
26main(_) ->
27    test_util:init_code_path(),
28
29    etap:plan(66),
30    case (catch test()) of
31        ok ->
32            etap:end_tests();
33        Other ->
34            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
35            etap:bail(Other)
36    end,
37    ok.
38
39
40test() ->
41    couch_set_view_test_util:start_server(test_set_name()),
42
43    etap:diag("Testing rollback of indexes"),
44
45    test_rollback_once(10),
46    %% Test with a header > 4k
47    test_rollback_once(5000),
48    test_rollback_multiple(3, 5),
49    test_rollback_multiple(2, 9),
50    test_rollback_different_seqs(5, 8),
51    test_rollback_different_seqs(1, 6),
52    test_rollback_not_exactly(),
53    test_rollback_to_first_header(),
54    test_rollback_during_compaction(),
55    test_rollback_unindexable_seqs(),
56    test_rollback_nonexistent(),
57    test_rollback_never_existed(),
58    test_rollback_mark_for_cleanup(),
59    test_rollback_multiple_partitions(),
60    test_rollback_multiple_partitions_missing(),
61    test_rollback_unindexable_now_indexable_seqs(),
62    test_rollback_added_partitions(),
63
64    couch_set_view_test_util:stop_server(),
65    ok.
66
67
68test_rollback_once(ReduceSize) ->
69    etap:diag("Testing rollback to a previous header"),
70
71    setup_test(ReduceSize),
72
73    % Populate and build the index with headers
74
75    populate_set(1, num_docs()),
76    {ok, {ViewResults1}} = couch_set_view_test_util:query_view(
77        test_set_name(), ddoc_id(), <<"testred">>, []),
78    GroupSeqs1 = get_seq_from_group(),
79
80    populate_set(num_docs() + 1, 2 * num_docs()),
81    {ok, {ViewResults2}} = couch_set_view_test_util:query_view(
82        test_set_name(), ddoc_id(), <<"testred">>, []),
83    etap:isnt(ViewResults1, ViewResults2, "Results are different"),
84    GroupSeqs2 = get_seq_from_group(),
85
86    % Verify that the latest stored header matches the group
87    Fd = get_fd(),
88    {ok, HeaderBin1, _Pos1} = couch_file:read_header_bin(Fd),
89    Header1 = couch_set_view_util:header_bin_to_term(HeaderBin1),
90    HeaderSeqs1 = Header1#set_view_index_header.seqs,
91    etap:is(HeaderSeqs1, GroupSeqs2,
92        "Latest stored header matches group header"),
93
94    % Rollback the index
95    PartId = 0,
96    PartSeq = couch_set_view_util:get_part_seq(PartId, GroupSeqs1),
97    ok = rollback_group([{PartId, PartSeq}]),
98    {ok, HeaderBin3, _Pos3} = couch_file:read_header_bin(Fd),
99    Header3 = couch_set_view_util:header_bin_to_term(HeaderBin3),
100    HeaderSeqs3 = Header3#set_view_index_header.seqs,
101    etap:is(HeaderSeqs3, GroupSeqs1,
102        "The most recent header of the truncated file has the correct "
103        "sequence numbers"),
104
105    % Query the group (which will make it startup) to see if the truncation
106    % was successful
107    {ok, {ViewResults3}} = couch_set_view_test_util:query_view(
108        test_set_name(), ddoc_id(), <<"testred">>, ["stale=ok"]),
109    etap:is(ViewResults3, ViewResults1,
110            "View returns the correct value after trunction"),
111    shutdown_group().
112
113
114test_rollback_multiple(From, NumRollback) ->
115    etap:diag("Testing rollback to a previous header (longer ago)"),
116    setup_test(25),
117
118    Inserted = insert_data(From + NumRollback),
119    rollback(Inserted, From),
120    shutdown_group().
121
122
123test_rollback_different_seqs(From, NumRollback) ->
124    etap:diag("Testing rollback when sequence numbers are different"),
125    setup_test(25),
126
127    random:seed({4, 5, 6}),
128    Inserted = insert_data_randomly(From + NumRollback),
129    rollback(Inserted, From),
130    shutdown_group().
131
132
133test_rollback_not_exactly() ->
134    etap:diag("Testing rollback to a previous header which doesn't "
135      "match the given sequence number exactly"),
136    setup_test(30),
137
138    Inserted = insert_data(3),
139    % Get the file descriptor right here as it triggers the updater
140    Fd = get_fd(),
141
142    % Pick a sequence number that is between the first and the second batch
143    % that was written. The index should than be rolled back to the first one.
144    {Seqs1, ViewResult1} = lists:nth(1, Inserted),
145    {Seqs2, _} = lists:nth(2, Inserted),
146    Seqs = lists:zipwith(fun({PartId, Seq1}, {PartId, Seq2}) ->
147        {PartId, Seq1 + ((Seq2 - Seq1) div 2)}
148    end, Seqs1, Seqs2),
149
150    % Rollback the index
151    PartId = 0,
152    PartSeq = couch_set_view_util:get_part_seq(PartId, Seqs),
153    ok = rollback_group([{PartId, PartSeq}]),
154    {ok, HeaderBin, _Pos3} = couch_file:read_header_bin(Fd),
155    Header = couch_set_view_util:header_bin_to_term(HeaderBin),
156    HeaderSeqs = Header#set_view_index_header.seqs,
157    etap:is(HeaderSeqs, Seqs1,
158        "The header of the truncated file has the correct sequence numbers "
159        "of the first batch"),
160
161    % Query the group (which will make it startup) to see if the truncation
162    % was successful
163    {ok, {ViewResult}} = couch_set_view_test_util:query_view(
164        test_set_name(), ddoc_id(), <<"testred">>, ["stale=ok"]),
165    etap:is(ViewResult, ViewResult1,
166            "View returns the correct value after trunction"),
167    shutdown_group().
168
169
170test_rollback_to_first_header() ->
171    etap:diag("Testing rollback to a previous header which doesn't "
172      "exist anymore due to compaction"),
173    setup_test(30),
174
175    Inserted = insert_data(3),
176    Fd1 = get_fd(),
177    {ok, HeaderBin1, Pos1} = couch_file:read_header_bin(Fd1),
178    Header1 = couch_set_view_util:header_bin_to_term(HeaderBin1),
179    HeaderSeqs1 = Header1#set_view_index_header.seqs,
180
181    etap:diag("Triggering compaction"),
182    {ok, CompactPid} = couch_set_view_compactor:start_compact(
183        mapreduce_view, test_set_name(), ddoc_id(), main),
184    Ref = erlang:monitor(process, CompactPid),
185    etap:diag("Waiting for main group compaction to finish"),
186    receive
187    {'DOWN', Ref, process, CompactPid, normal} ->
188        ok;
189    {'DOWN', Ref, process, CompactPid, noproc} ->
190        ok;
191    {'DOWN', Ref, process, CompactPid, Reason} ->
192        etap:bail("Failure compacting main group: " ++ couch_util:to_list(Reason))
193    after 5000 ->
194        etap:bail("Timeout waiting for main group compaction to finish")
195    end,
196
197    Fd2 = get_fd(),
198
199    % Verify that the file got compacted
200    {ok, HeaderBin2, Pos2} = couch_file:read_header_bin(Fd2),
201    Header2 = couch_set_view_util:header_bin_to_term(HeaderBin2),
202    HeaderSeqs2 = Header2#set_view_index_header.seqs,
203    etap:is(HeaderSeqs2, HeaderSeqs1, "The sequence numbers in the "
204        "latest header are the same pre and post compaction"),
205    etap:ok(Pos2 < Pos1, "Header is further at the front of the file"),
206
207    %  Verify that are no previous headers left
208    {ok, HeaderBin3, Pos3} = couch_file:find_header_bin(Fd2, Pos2 - 1),
209    etap:is(Pos3, 0,
210        "There are no headers other than the one the file starts with"),
211
212    % Pick a sequence number that is lower than the one from the header,
213    % hence doesn't exist in the file
214    PartId = 0,
215    PartSeq = couch_set_view_util:get_part_seq(PartId, HeaderSeqs2) - 1,
216
217    % Rollback the index
218    ok = rollback_group([{PartId, PartSeq}]),
219
220    % The rollback is to the first header of the file, which means
221    % that a complete re-indexing is needed.
222    Fd3 = get_fd(),
223    {ok, HeaderBin4, Pos4} = couch_file:read_header_bin(Fd3),
224    Header4 = couch_set_view_util:header_bin_to_term(HeaderBin4),
225    HeaderSeqs4 = Header4#set_view_index_header.seqs,
226    etap:is(HeaderSeqs4, [{PartId, 0} || {PartId, _Seq} <- HeaderSeqs1],
227        "The sequence numbers are reset to 0"),
228
229    shutdown_group().
230
231
232test_rollback_during_compaction() ->
233    etap:diag("Testing rollback during compaction"),
234    setup_test(25),
235
236    Inserted = insert_data(10),
237
238    etap:diag("Triggering compaction"),
239    {ok, CompactPid} = couch_set_view_compactor:start_compact(
240        mapreduce_view, test_set_name(), ddoc_id(), main),
241    Ref = erlang:monitor(process, CompactPid),
242    CompactPid ! pause,
243    etap:diag("Waiting for main group compaction to finish"),
244    receive
245    {'DOWN', Ref, process, CompactPid, _Reason} ->
246        etap:bail("Compaction finished before it got paused")
247    after 0 ->
248        ok
249    end,
250    etap:is(is_process_alive(CompactPid), true, "Compactor is still running"),
251
252    rollback(Inserted, 5),
253    receive
254    {'DOWN', Ref, process, CompactPid, shutdown} ->
255        etap:ok(true, "Compaction was shutdown properly");
256    {'DOWN', Ref, process, CompactPid, Reason} ->
257        etap:bail("Compaction unexpectedly stopped")
258    after 0 ->
259        etap:bail("Compaction should have been stopped")
260    end,
261    etap:is(is_process_alive(CompactPid), false,
262        "Compactor is not running after the rollback"),
263    shutdown_group().
264
265
266test_rollback_unindexable_seqs() ->
267    etap:diag("Testing rollback with header that contains unindexable "
268        "partitions"),
269    setup_test(30),
270
271    populate_set(1, num_docs()),
272    {ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
273        test_set_name(), ddoc_id(), <<"testred">>, []),
274
275    % Mark a few partitions as unindexable, insert more data and make
276    % sure a new header is written.
277
278    Fd = get_fd(),
279    Unindexable = lists:seq(num_set_partitions() div 2, num_set_partitions() - 1),
280    ok = couch_set_view:mark_partitions_unindexable(
281        mapreduce_view, test_set_name(), ddoc_id(), Unindexable),
282    populate_set(num_docs() + 1, 2 * num_docs()),
283    trigger_updater(),
284    GroupSeqs = get_seq_from_group(),
285    GroupSeqsUnindexable = get_unindexable_seq_from_group(),
286    etap:is([PartId || {PartId, _} <- GroupSeqsUnindexable], Unindexable,
287        "Partitions were set to unindexable"),
288    {ok, HeaderBin1, _Pos1} = couch_file:read_header_bin(Fd),
289    Header1 = couch_set_view_util:header_bin_to_term(HeaderBin1),
290    HeaderSeqs1 = Header1#set_view_index_header.seqs,
291    HeaderSeqsUnindexable1 = Header1#set_view_index_header.unindexable_seqs,
292    etap:is(HeaderSeqs1, GroupSeqs,
293        "The on-disk header has the same indexable sequence numbers as the "
294        "group header"),
295    etap:is(HeaderSeqsUnindexable1, GroupSeqsUnindexable,
296        "The on-disk header has the same unindexable sequence numbers as the "
297        "group header"),
298
299    % The most current header now contains unindexable partitions with the
300    % same sequence number as the header before it. If it works correctly and
301    % takes the unindexable partitions into account, a rollback to that
302    % sequence number should keep the file as it is and not actually doing
303    % a roll back.
304
305    PartId = num_set_partitions() - 1,
306    PartSeq = couch_set_view_util:get_part_seq(PartId, GroupSeqsUnindexable),
307    ok = rollback_group([{PartId, PartSeq}]),
308
309    {ok, HeaderBin2, _Pos2} = couch_file:read_header_bin(Fd),
310    Header2 = couch_set_view_util:header_bin_to_term(HeaderBin2),
311    HeaderSeqs2 = Header2#set_view_index_header.seqs,
312    etap:is(HeaderSeqs2, GroupSeqs,
313        "The most recent header of the truncated file has the correct "
314        "sequence numbers"),
315    shutdown_group().
316
317
318test_rollback_nonexistent() ->
319    etap:diag("Testing rollback with old headers not containing the "
320      "requested partition (the most recent does)"),
321
322    % Create a view with the last partition missing
323
324    setup_test(30, num_set_partitions() - 1),
325    populate_set(1, num_docs()),
326    {ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
327        test_set_name(), ddoc_id(), <<"testred">>, []),
328    GroupSeqs1 = get_seq_from_group(),
329
330    PartId = num_set_partitions() - 1,
331    etap:is(couch_set_view_util:has_part_seq(PartId, GroupSeqs1), false,
332        "Last Partition is currently not part of the index"),
333
334    % Add the last partition, insert more data and make sure a new header
335    % is written.
336
337    Fd = get_fd(),
338    AllPartitions = lists:seq(0, num_set_partitions() - 1),
339    ok = couch_set_view:set_partition_states(
340        mapreduce_view, test_set_name(), ddoc_id(), AllPartitions, [], []),
341    populate_set(num_docs() + 1, 2 * num_docs()),
342    trigger_updater(),
343    GroupSeqs2 = get_seq_from_group(),
344
345    % Rollback to a sequence number that is smaller than the current
346    % sequence number of the last partition. Verify that it got rolled back
347    % to the first header that doesn't contain the partition.
348
349    PartSeq = couch_set_view_util:get_part_seq(PartId, GroupSeqs2) - 1,
350    ok = rollback_group([{PartId, PartSeq}]),
351    {ok, HeaderBin, _Pos} = couch_file:read_header_bin(Fd),
352    Header = couch_set_view_util:header_bin_to_term(HeaderBin),
353    HeaderSeqs = Header#set_view_index_header.seqs,
354    % NOTE vmx 2013-11-05: Having the newly added partition as part of the
355    % header it was rolled back to, leverages an implementation detail of the
356    % the indexer. Whenever a new partition is added a new header with the
357    % same sequence numbers as before plus the new partition with sequence
358    % number 0 is written. In case that changes in the future this test fail.
359    Expected = GroupSeqs1 ++ [{3, 0}],
360    etap:is(HeaderSeqs, Expected, "Rollback contains all partitions"),
361    GroupSeqs3 = get_seq_from_group(),
362    etap:is(HeaderSeqs, GroupSeqs3,
363        "On-disk header has the same sequence numbers as the group"),
364    GroupInfo = get_group_info(),
365    {active_partitions, ActivePartitions} = lists:keyfind(
366        active_partitions, 1, GroupInfo),
367    etap:is(ActivePartitions, AllPartitions,
368        "The partition the on-disk header is missing is part of the active "
369        "partitions as expected"),
370    shutdown_group().
371
372
373test_rollback_never_existed() ->
374    etap:diag("Testing rollback with request a partition that the index"
375       "never contained"),
376
377    % Create a view with the last partition missing
378    setup_test(30, num_set_partitions() - 1),
379
380    Inserted = insert_data(3),
381    GroupSeqs = get_seq_from_group(),
382    PartId = num_set_partitions() - 1,
383    etap:is(couch_set_view_util:has_part_seq(PartId, GroupSeqs), false,
384        "Last Partition is not part of the index"),
385
386    % Get header information before the rollback
387
388    Fd = get_fd(),
389    {ok, HeaderBin0, _Pos0} = couch_file:read_header_bin(Fd),
390
391    % Rollback
392
393    % Get a valid sequence number
394    PartSeq = couch_set_view_util:get_part_seq(PartId - 1, GroupSeqs),
395    ok = rollback_group([{PartId, PartSeq}]),
396
397    % Verify that no rollback happened
398
399    {ok, HeaderBin, _Pos} = couch_file:read_header_bin(Fd),
400    Header = couch_set_view_util:header_bin_to_term(HeaderBin),
401    HeaderSeqs = Header#set_view_index_header.seqs,
402    etap:is(HeaderBin, HeaderBin0, "Header is equal to the old one"),
403    etap:is(HeaderSeqs, GroupSeqs,
404        "On-disk header has the same sequence numbers as the group"),
405    shutdown_group().
406
407
408test_rollback_mark_for_cleanup() ->
409    etap:diag("Testing rollback with old headers containing more partitions "
410      "than the current one where the rollback starts"),
411
412    setup_test(30),
413    Fd = get_fd(),
414    MissingA = num_set_partitions() - 2,
415    MissingB = num_set_partitions() - 1,
416    MissingPartitions = [MissingA, MissingB],
417
418    populate_set(1, num_docs() div 2),
419    trigger_initial_build(),
420    GroupSeqs1 = get_seq_from_group(),
421    etap:is(length(GroupSeqs1), num_set_partitions(),
422        "All partitions are indexable"),
423
424    % Mark last partition as unindexable
425    ok = couch_set_view:mark_partitions_unindexable(
426        mapreduce_view, test_set_name(), ddoc_id(), [MissingB]),
427
428    % Several updates need to be happen in order to have a way to roll back
429    % to a header where there are unindexable partitions
430
431    populate_set(
432        (num_docs() div 2) + 1, (num_docs() div 2) + (num_docs() div 4)),
433    trigger_updater(),
434    GroupSeqs2 = get_seq_from_group(),
435    GroupSeqUnindexable2 = get_unindexable_seq_from_group(),
436    etap:is(length(GroupSeqs2), length(GroupSeqs1) - 1,
437        "One partitions is not indexable"),
438    etap:is(length(GroupSeqUnindexable2), 1, "One partitions is unindexable"),
439    populate_set((num_docs() div 2) + (num_docs() div 4) + 1, num_docs()),
440    trigger_updater(),
441    GroupSeqUnindexable3 = get_unindexable_seq_from_group(),
442    etap:is(length(GroupSeqUnindexable3), 1,
443        "One partitions is still unindexable"),
444
445    % Remove the two last partitions, insert more data and make sure a
446    % new header is written.
447
448    % A partition can only be removed (cleaned up) when it is indexable
449    ok = couch_set_view:mark_partitions_indexable(
450        mapreduce_view, test_set_name(), ddoc_id(),
451        [MissingB]),
452
453    ok = couch_set_view:set_partition_states(
454        mapreduce_view, test_set_name(), ddoc_id(), [], [],
455        MissingPartitions),
456
457    populate_set(num_docs() + 1, 2 * num_docs()),
458    trigger_updater(),
459    GroupSeqs4 = get_seq_from_group(),
460    etap:is(couch_set_view_util:has_part_seq(MissingA, GroupSeqs4),
461        false,
462        "The second to last partition is currently not part of the index"),
463    etap:is(couch_set_view_util:has_part_seq(MissingB, GroupSeqs4),
464        false,
465        "Last partition is currently not part of the index"),
466
467    % Rollback to a sequence where the last partition is part of and
468    % verify that the additional partition is marked for cleanup and
469    % not part of the sequences
470
471    PartId = 0,
472    PartSeq = couch_set_view_util:get_part_seq(PartId, GroupSeqs2),
473    ok = rollback_group([{PartId, PartSeq}]),
474
475
476    GroupInfo = get_group_info(),
477    {cleanup_partitions, CleanupPartitions} = lists:keyfind(
478        cleanup_partitions, 1, GroupInfo),
479    etap:is(CleanupPartitions, MissingPartitions,
480        "Last partitions were correctly marked for cleanup"),
481    {update_seqs, {UpdateSeqs}} = lists:keyfind(update_seqs, 1, GroupInfo),
482    etap:is(couch_set_view_util:has_part_seq(MissingA, UpdateSeqs),
483        false,
484        "Second to last partition is currently not part of the index"),
485    etap:is(couch_set_view_util:has_part_seq(MissingB, UpdateSeqs),
486        false,
487        "Last Partition is currently not part of the index"),
488    shutdown_group().
489
490
491test_rollback_multiple_partitions() ->
492    etap:diag("Testing rollback with multiple partitions"),
493    setup_test(25),
494
495    random:seed({4, 5, 6}),
496    Inserted = insert_data_randomly(8),
497
498    {PartSeqs3, ViewResult3} = lists:nth(3, Inserted),
499    {PartSeqs5, ViewResult5} = lists:nth(5, Inserted),
500    {PartSeqs6, ViewResult6} = lists:nth(6, Inserted),
501    PartSeqs = [
502        {0, couch_set_view_util:get_part_seq(0, PartSeqs5)},
503        {1, couch_set_view_util:get_part_seq(1, PartSeqs6)},
504        {2, couch_set_view_util:get_part_seq(2, PartSeqs3)}
505    ],
506
507    ok = rollback_group(PartSeqs),
508
509    {ok, {ViewResultTruncated}} = couch_set_view_test_util:query_view(
510        test_set_name(), ddoc_id(), <<"testred">>, ["stale=ok"]),
511    etap:is(ViewResultTruncated, ViewResult3,
512            "View returns the correct value after trunction"),
513    GroupSeqs = get_seq_from_group(),
514    Found = lists:member(GroupSeqs, [Seqs || {Seqs, _} <- Inserted]),
515    etap:ok(Found, "The current header matches a previous state"),
516
517    shutdown_group().
518
519
520test_rollback_multiple_partitions_missing() ->
521    etap:diag("Testing rollback with multiple partitions where one partition "
522      "was not always part of the index"),
523
524    setup_test(30),
525    populate_set(1, num_docs()),
526    {ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
527        test_set_name(), ddoc_id(), <<"testred">>, []),
528    GroupSeqs1 = get_seq_from_group(),
529    etap:is(length(GroupSeqs1), num_set_partitions(),
530        "All partitions are there"),
531
532    % Remove last partition, insert more data and make sure a new header
533    % is written.
534
535    MissingPartition = num_set_partitions() - 1,
536    ok = couch_set_view:set_partition_states(
537        mapreduce_view, test_set_name(), ddoc_id(), [], [],
538        [MissingPartition]),
539    populate_set(num_docs() + 1, 2 * num_docs()),
540    trigger_updater(),
541    GroupSeqs2 = get_seq_from_group(),
542    etap:is(couch_set_view_util:has_part_seq(MissingPartition, GroupSeqs2),
543        false,
544        "Last Partition is currently not part of the index"),
545
546    % Add last partition again, insert more data and make sure a new header
547    % is written.
548
549    ok = couch_set_view:set_partition_states(
550        mapreduce_view, test_set_name(), ddoc_id(),
551        [MissingPartition], [], []),
552    populate_set(num_docs() + 1, 2 * num_docs()),
553    trigger_updater(),
554    GroupSeqs3 = get_seq_from_group(),
555    etap:is(couch_set_view_util:has_part_seq(MissingPartition, GroupSeqs3),
556        true,
557        "Last Partition is again part of the index"),
558
559    % Rollback to a sequence that was prior to the removal of the last
560    % partition and verify that it did rollback
561
562    ok = rollback_group(GroupSeqs1),
563    % As the last partition is part of the group, it will be added
564    % after the rollback with sequence number 0.
565    Expected = lists:keydelete(MissingPartition, 1, GroupSeqs1) ++
566        [{MissingPartition, 0}],
567    etap:is(get_seq_from_group(), Expected, "Rollback is correct"),
568    shutdown_group().
569
570
571test_rollback_unindexable_now_indexable_seqs() ->
572    etap:diag("Testing rollback with header that contains unindexable "
573        "partitions which are then made indexable"),
574    setup_test(30),
575
576    % Mark a few partitions as unindexable, insert data and make
577    % sure a new header is written.
578
579    populate_set(1, num_docs() div 2),
580    {ok, {_ViewResults1}} = couch_set_view_test_util:query_view(
581        test_set_name(), ddoc_id(), <<"testred">>, []),
582    Fd = get_fd(),
583    Unindexable = lists:seq(num_set_partitions() div 2, num_set_partitions() - 1),
584    ok = couch_set_view:mark_partitions_unindexable(
585        mapreduce_view, test_set_name(), ddoc_id(), Unindexable),
586
587    populate_set((num_docs() div 2) + 1, num_docs()),
588    trigger_updater(),
589
590    GroupSeqs = get_seq_from_group(),
591    GroupSeqsUnindexable = get_unindexable_seq_from_group(),
592    etap:is([PartId || {PartId, _} <- GroupSeqsUnindexable], Unindexable,
593        "Partitions were set to unindexable"),
594
595    % Mark one partition indexable that was previously marked as unindexable,
596    % insert more data and make sure a new header is written.
597
598    Indexable = lists:seq(0, num_set_partitions() div 2),
599    ok = couch_set_view:mark_partitions_indexable(
600        mapreduce_view, test_set_name(), ddoc_id(), Indexable),
601    populate_set(num_docs() + 1, 2 * num_docs()),
602    trigger_updater(),
603    GroupSeqs2 = get_seq_from_group(),
604    GroupSeqsUnindexable2 = get_unindexable_seq_from_group(),
605    etap:is([PartId || {PartId, _} <- GroupSeqs2], Indexable,
606        "The expected partitions are indexable"),
607    etap:is(GroupSeqsUnindexable2, tl(GroupSeqsUnindexable),
608        "The expected partitions are unindexable"),
609
610    {ok, HeaderBin1, _Pos1} = couch_file:read_header_bin(Fd),
611    Header1 = couch_set_view_util:header_bin_to_term(HeaderBin1),
612    HeaderSeqs1 = Header1#set_view_index_header.seqs,
613    HeaderSeqsUnindexable1 = Header1#set_view_index_header.unindexable_seqs,
614    etap:is(HeaderSeqs1, GroupSeqs2,
615        "The on-disk header has the same indexable sequence numbers as the "
616        "group header"),
617    etap:is(HeaderSeqsUnindexable1, GroupSeqsUnindexable2,
618        "The on-disk header has the same unindexable sequence numbers as the "
619        "group header"),
620
621    % Rollback to the state when more unindexable partitions existed
622
623    PartId = num_set_partitions() div 2,
624    PartSeq = couch_set_view_util:get_part_seq(PartId, GroupSeqsUnindexable),
625    ok = rollback_group([{PartId, PartSeq}]),
626
627    % Check that the new header contains the same indexable and unindexable
628    % partitions as before the rollback. Only their sequence numbers should
629    % be different.
630
631    {ok, HeaderBin2, _Pos2} = couch_file:read_header_bin(Fd),
632    Header2 = couch_set_view_util:header_bin_to_term(HeaderBin2),
633    HeaderSeqs2 = Header2#set_view_index_header.seqs,
634    HeaderSeqsUnindexable2 = Header2#set_view_index_header.unindexable_seqs,
635    etap:is([PartId || {PartId, _} <- HeaderSeqs2],
636        [PartId || {PartId, _} <- GroupSeqs2],
637        "The most recent header of the truncated file has the correct "
638        "indexable partition IDs"),
639    etap:is([PartId || {PartId, _} <- HeaderSeqsUnindexable2],
640        [PartId || {PartId, _} <- GroupSeqsUnindexable2],
641        "The most recent header of the truncated file has the correct "
642        "unindexable partition IDs"),
643    etap:isnt([Seq || {_, Seq} <- HeaderSeqs2],
644        [Seq || {_, Seq} <- GroupSeqs2],
645        "The most recent header of the truncated file has different "
646        "indexable partition sequence numbers than the original header"),
647    etap:is([Seq || {_, Seq} <- HeaderSeqsUnindexable2],
648        [Seq || {_, Seq} <- GroupSeqsUnindexable2],
649        "The most recent header of the truncated file has the same "
650        "unindexable partition sequence numbers than the original header"),
651    shutdown_group().
652
653test_rollback_added_partitions() ->
654    etap:diag("Testing rollback after new partitons are added"),
655
656    setup_test(30, num_set_partitions() - 1),
657    GroupPid = couch_set_view:get_group_pid(
658        mapreduce_view, test_set_name(), ddoc_id(), prod),
659    Fd1 = get_fd(),
660    populate_set(1, num_docs()),
661    trigger_initial_build(),
662
663    GroupSeqs = get_seq_from_group(),
664    PartId = 0,
665    PartSeq = couch_set_view_util:get_part_seq(PartId, GroupSeqs),
666
667    NewPartition = num_set_partitions() - 1,
668    ok = couch_set_view:set_partition_states(
669        mapreduce_view, test_set_name(), ddoc_id(), [NewPartition], [],
670        []),
671
672    populate_set(num_docs() + 1, 2 * num_docs()),
673    trigger_updater(),
674
675    SeqsExpected1 = lists:map(
676        fun(PartId) ->
677            {PartId, (num_docs() * 2) div num_set_partitions()}
678        end, lists:seq(0, num_set_partitions() - 1)),
679
680    {ok, {[{<<"total_rows">>, Rows}, _, _]}} = couch_set_view_test_util:query_view(
681        test_set_name(), ddoc_id(), <<"test">>, []),
682    GroupSeqs1 = get_seq_from_group(),
683    etap:is(Rows, num_docs() * 2,
684        "Expected number of rows found"),
685    etap:is(GroupSeqs1, SeqsExpected1, "Seqs are present as expected"),
686
687    ok = rollback_group([{PartId, PartSeq}]),
688
689    trigger_updater(),
690    couch_set_view_test_util:wait_for_updater_to_finish(GroupPid, ?MAX_WAIT_TIME),
691    Fd2 = get_fd(),
692    {ok, HeaderBin2, _Pos2} = couch_file:read_header_bin(Fd2),
693    Header2 = couch_set_view_util:header_bin_to_term(HeaderBin2),
694    PartVersions2 = Header2#set_view_index_header.partition_versions,
695    Seqs2 = Header2#set_view_index_header.seqs,
696    etap:is(length(PartVersions2), num_set_partitions(),
697        "Expected number of entries found in partitions version list"),
698
699    SeqsExpected2 = lists:map(
700        fun(PartId) ->
701            {PartId, (num_docs() * 2) div num_set_partitions()}
702        end, lists:seq(0, num_set_partitions() - 1)),
703    etap:is(Seqs2, SeqsExpected2, "Seqs are present as expected"),
704    couch_set_view_test_util:wait_for_updater_to_finish(GroupPid, ?MAX_WAIT_TIME),
705    shutdown_group().
706
707
708insert_data(NumBatches) ->
709    insert_data(NumBatches, 1, []).
710insert_data(SameNum, SameNum, Acc) ->
711    lists:reverse(Acc);
712insert_data(NumBatches, Count, Acc0) ->
713    From = (Count * num_docs()) + 1,
714    populate_set(From, From + num_docs()),
715    {ok, {ViewResult}} = couch_set_view_test_util:query_view(
716        test_set_name(), ddoc_id(), <<"testred">>, []),
717    GroupSeqs = get_seq_from_group(),
718    Acc = [{GroupSeqs, ViewResult}|Acc0],
719    insert_data(NumBatches, Count + 1, Acc).
720
721insert_data_randomly(NumBatches) ->
722    insert_data_randomly(NumBatches, 1, []).
723insert_data_randomly(SameNum, SameNum, Acc) ->
724    lists:reverse(Acc);
725insert_data_randomly(NumBatches, Count, Acc0) ->
726    From = (Count * num_docs()) + 1,
727    populate_set_randomly(From, From + num_docs()),
728    {ok, {ViewResult}} = couch_set_view_test_util:query_view(
729        test_set_name(), ddoc_id(), <<"testred">>, []),
730    GroupSeqs = get_seq_from_group(),
731    Acc = [{GroupSeqs, ViewResult} | Acc0],
732    insert_data_randomly(NumBatches, Count + 1, Acc).
733
734rollback(Inserted, From) ->
735    {PartSeqs, ViewResult} = lists:nth(From, Inserted),
736    PartId = 1,
737    PartSeq = couch_set_view_util:get_part_seq(PartId, PartSeqs),
738    ok = rollback_group([{PartId, PartSeq}]),
739
740    {ok, {ViewResultTruncated}} = couch_set_view_test_util:query_view(
741        test_set_name(), ddoc_id(), <<"testred">>, ["stale=ok"]),
742    etap:is(ViewResultTruncated, ViewResult,
743            "View returns the correct value after trunction"),
744    GroupSeqs = get_seq_from_group(),
745    Found = lists:member(GroupSeqs, [Seqs || {Seqs, _} <- Inserted]),
746    etap:ok(Found, "The current header matches a previous state").
747
748
749setup_test(ReduceSize) ->
750    setup_test(ReduceSize, num_set_partitions()).
751setup_test(ReduceSize, NumViewPartitions) ->
752    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
753    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
754
755    ReduceValue = random_binary(ReduceSize),
756    DDoc = {[
757        {<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
758        {<<"json">>, {[
759            {<<"views">>, {[
760                {<<"test">>, {[
761                    {<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>}
762                ]}},
763                {<<"testred">>, {[
764                    {<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>},
765                    {<<"reduce">>, <<"function(key, values, rereduce) {\n"
766                        "if (rereduce) values = values.map("
767                        "function(elem) {return elem[1];});"
768                        "var max = Math.max.apply(this, values);"
769                        "return [\"", ReduceValue/binary, "\", max];}">>}
770                ]}}
771            ]}}
772        ]}}
773    ]},
774    ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
775    ok = configure_view_group(NumViewPartitions).
776
777random_binary(N) ->
778    random:seed({1, 2, 3}),
779    << <<(random:uniform(20) + 100):8>> ||  _ <- lists:seq(1, N) >>.
780
781
782shutdown_group() ->
783    GroupPid = couch_set_view:get_group_pid(
784        mapreduce_view, test_set_name(), ddoc_id(), prod),
785    couch_set_view_test_util:wait_for_updater_to_finish(GroupPid, ?MAX_WAIT_TIME),
786    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
787    MonRef = erlang:monitor(process, GroupPid),
788    receive
789    {'DOWN', MonRef, _, _, _} ->
790        ok
791    after 10000 ->
792        etap:bail("Timeout waiting for group shutdown")
793    end.
794
795
796populate_set(From, To) ->
797    etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
798        " databases with " ++ integer_to_list(num_docs()) ++ " documents"),
799    DocList = generate_docs(From, To),
800    ok = couch_set_view_test_util:populate_set_sequentially(
801        test_set_name(),
802        lists:seq(0, num_set_partitions() - 1),
803        DocList).
804
805populate_set_randomly(From, To) ->
806    etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
807        " databases with " ++ integer_to_list(num_docs()) ++ " documents"),
808    DocList = generate_docs(From, To),
809    ok = couch_set_view_test_util:populate_set_randomly(
810        test_set_name(),
811        lists:seq(0, num_set_partitions() - 1),
812        DocList).
813
814generate_docs(From, To) ->
815    DocList = lists:map(
816        fun(I) ->
817            Doc = iolist_to_binary(["doc", integer_to_list(I)]),
818            {[
819                {<<"meta">>, {[{<<"id">>, Doc}]}},
820                {<<"json">>, {[{<<"value">>, I}]}}
821            ]}
822        end,
823        lists:seq(From, To)).
824
825
826configure_view_group(NumViewPartitions) ->
827    etap:diag("Configuring view group"),
828    Params = #set_view_params{
829        max_partitions = num_set_partitions(),
830        active_partitions = lists:seq(0, NumViewPartitions-1),
831        passive_partitions = [],
832        use_replica_index = false
833    },
834    try
835        couch_set_view:define_group(
836            mapreduce_view, test_set_name(), ddoc_id(), Params)
837    catch _:Error ->
838        Error
839    end.
840
841
842get_group_pid() ->
843    couch_set_view:get_group_pid(
844        mapreduce_view, test_set_name(), ddoc_id(), prod).
845
846
847get_group_snapshot() ->
848    GroupPid = couch_set_view:get_group_pid(
849        mapreduce_view, test_set_name(), ddoc_id(), prod),
850    {ok, Group, 0} = gen_server:call(
851        GroupPid, #set_view_group_req{stale = ok, debug = true}, infinity),
852    Group.
853
854
855get_group_info() ->
856    GroupPid = couch_set_view:get_group_pid(
857        mapreduce_view, test_set_name(), ddoc_id(), prod),
858    {ok, GroupInfo} = couch_set_view_group:request_group_info(GroupPid),
859    GroupInfo.
860
861
862get_seq_from_group() ->
863    GroupInfo = get_group_info(),
864    {update_seqs, {Seqs}} = lists:keyfind(update_seqs, 1, GroupInfo),
865    [{couch_util:to_integer(PartId), Seq} || {PartId, Seq} <- Seqs].
866
867
868get_unindexable_seq_from_group() ->
869    GroupInfo = get_group_info(),
870    {unindexable_partitions, {Seqs}} = lists:keyfind(
871        unindexable_partitions, 1, GroupInfo),
872    [{couch_util:to_integer(PartId), Seq} || {PartId, Seq} <- Seqs].
873
874
875get_fd() ->
876    Group = get_group_snapshot(),
877    Group#set_view_group.fd.
878
879
880trigger_initial_build() ->
881    GroupPid = couch_set_view:get_group_pid(
882        mapreduce_view, test_set_name(), ddoc_id(), prod),
883    {ok, _, _} = gen_server:call(
884        GroupPid, #set_view_group_req{stale = false, debug = true}, ?MAX_WAIT_TIME).
885
886
887trigger_updater() ->
888    GroupPid = get_group_pid(),
889    {ok, UpPid} = gen_server:call(GroupPid, {start_updater, []}, infinity),
890    case UpPid of
891    nil ->
892        ok;
893    _ ->
894        UpRef = erlang:monitor(process, UpPid),
895        receive
896        {'DOWN', UpRef, process, UpPid, {updater_finished, _}} ->
897            ok;
898        {'DOWN', UpRef, process, UpPid, Reason} ->
899            etap:bail("Updater died with unexpected reason: " ++
900                couch_util:to_list(Reason))
901        after 5000 ->
902            etap:bail("Timeout waiting for updater to finish")
903        end
904    end.
905
906
907rollback_group(RollbackSeqs) ->
908    GroupPid = get_group_pid(),
909    gen_server:call(GroupPid, {test_rollback, RollbackSeqs}).
910