1#!/usr/bin/env escript
2%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_set_view/include/couch_set_view.hrl").
18
19-define(JSON_ENCODE(V), ejson:encode(V)). % couch_db.hrl
20-define(MAX_WAIT_TIME, 600 * 1000).
21
22% from couch_db.hrl
23-define(MIN_STR, <<>>).
24-define(MAX_STR, <<255>>).
25
26-record(view_query_args, {
27    start_key,
28    end_key,
29    start_docid = ?MIN_STR,
30    end_docid = ?MAX_STR,
31    direction = fwd,
32    inclusive_end = true,
33    limit = 10000000000,
34    skip = 0,
35    group_level = 0,
36    view_type = nil,
37    include_docs = false,
38    conflicts = false,
39    stale = false,
40    multi_get = false,
41    callback = nil,
42    list = nil,
43    run_reduce = true,
44    keys = nil,
45    view_name = nil,
46    debug = false,
47    filter = true,
48    type = main
49}).
50
51test_set_name() -> <<"couch_test_set_index_passive_parts">>.
52num_set_partitions() -> 64.
53ddoc_id() -> <<"_design/test">>.
54num_docs() -> 16448.  % keep it a multiple of num_set_partitions()
55
56
57main(_) ->
58    test_util:init_code_path(),
59
60    etap:plan(561),
61    case (catch test()) of
62        ok ->
63            etap:end_tests();
64        Other ->
65            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
66            etap:bail(Other)
67    end,
68    ok.
69
70
71test() ->
72    couch_set_view_test_util:start_server(test_set_name()),
73
74    create_set(),
75    add_documents(0, num_docs()),
76
77    % build index
78    _ = get_group_snapshot(),
79
80    ActivePartitions1 = lists:seq(0, num_set_partitions() - 1),
81
82    verify_btrees(ActivePartitions1, fun(I) -> I end),
83
84    FoldFun = fun(PartId, ActivePartsAcc) ->
85        ActivePartsAcc2 = ordsets:del_element(PartId, ActivePartsAcc),
86        ok = couch_set_view:set_partition_states(
87            mapreduce_view, test_set_name(), ddoc_id(), [], [PartId], []),
88        fold_view(ActivePartsAcc2, fun(I) -> I end),
89        ActivePartsAcc2
90    end,
91
92    ActivePartitions2 = lists:foldl(
93        FoldFun,
94        ActivePartitions1,
95        lists:reverse(lists:seq(1, 31, 2))),
96
97    update_documents(0, num_docs(), fun(I) -> I * 5 end),
98    fold_view(ActivePartitions2, fun(I) -> I * 5 end),
99
100    wait_updater_finishes(),
101    update_documents(0, num_docs(), fun(I) -> I end),
102    fold_view(ActivePartitions2, fun(I) -> I end),
103
104    wait_updater_finishes(),
105    verify_btrees(ActivePartitions2, fun(I) -> I end),
106    compact_view_group(),
107    verify_btrees(ActivePartitions2, fun(I) -> I end),
108
109    ActivePartitions3 = lists:foldl(
110        FoldFun,
111        ActivePartitions2,
112        lists:seq(0, 31, 2)),
113
114    update_documents(0, num_docs(), fun(I) -> I * 5 end),
115    fold_view(ActivePartitions3, fun(I) -> I * 5 end),
116
117    wait_updater_finishes(),
118    update_documents(0, num_docs(), fun(I) -> I end),
119    fold_view(ActivePartitions3, fun(I) -> I end),
120
121    wait_updater_finishes(),
122    verify_btrees(ActivePartitions3, fun(I) -> I end),
123    compact_view_group(),
124    verify_btrees(ActivePartitions3, fun(I) -> I end),
125
126    ActivePartitions4 = lists:foldl(
127        FoldFun,
128        ActivePartitions3,
129        lists:seq(32, 63, 2)),
130
131    update_documents(0, num_docs(), fun(I) -> I * I + 3 end),
132    fold_view(ActivePartitions4, fun(I) -> I * I + 3 end),
133
134    wait_updater_finishes(),
135    update_documents(0, num_docs(), fun(I) -> I end),
136    fold_view(ActivePartitions4, fun(I) -> I end),
137
138    wait_updater_finishes(),
139    verify_btrees(ActivePartitions4, fun(I) -> I end),
140    compact_view_group(),
141    verify_btrees(ActivePartitions4, fun(I) -> I end),
142
143    ActivePartitions5 = lists:foldl(
144        FoldFun,
145        ActivePartitions4,
146        lists:reverse(lists:seq(33, 63, 2))),
147
148    update_documents(0, num_docs(), fun(I) -> I * -3 end),
149    fold_view(ActivePartitions5, fun(I) -> I * -3 end),
150
151    wait_updater_finishes(),
152    update_documents(0, num_docs(), fun(I) -> I end),
153    fold_view(ActivePartitions5, fun(I) -> I end),
154
155    wait_updater_finishes(),
156    verify_btrees(ActivePartitions5, fun(I) -> I end),
157    compact_view_group(),
158    verify_btrees(ActivePartitions5, fun(I) -> I end),
159
160    etap:is(
161        ActivePartitions5,
162        [],
163        "Final list of active partitions is empty"),
164
165    FinalActivePartitions = lists:seq(0, num_set_partitions() - 1),
166
167    lists:foreach(
168        fun(PartId) ->
169            ok = couch_set_view:set_partition_states(
170                mapreduce_view, test_set_name(), ddoc_id(), [PartId], [], [])
171        end, FinalActivePartitions),
172
173    fold_view(FinalActivePartitions, fun(I) -> I end),
174    wait_updater_finishes(),
175    verify_btrees(FinalActivePartitions, fun(I) -> I end),
176
177    compact_view_group(),
178
179    fold_view(FinalActivePartitions, fun(I) -> I end),
180    wait_updater_finishes(),
181    verify_btrees(FinalActivePartitions, fun(I) -> I end),
182
183    update_documents(0, num_docs(), fun(I) -> I * 10 + 1 end),
184    fold_view(FinalActivePartitions, fun(I) -> I * 10 + 1 end),
185    wait_updater_finishes(),
186    verify_btrees(FinalActivePartitions, fun(I) -> I * 10 + 1 end),
187
188    compact_view_group(),
189    verify_btrees(FinalActivePartitions, fun(I) -> I * 10 + 1 end),
190    fold_view(FinalActivePartitions, fun(I) -> I * 10 + 1 end),
191
192    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
193    couch_set_view_test_util:stop_server(),
194    ok.
195
196
197create_set() ->
198    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
199    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
200    couch_set_view:cleanup_index_files(mapreduce_view, test_set_name()),
201    etap:diag("Creating the set databases (# of partitions: " ++
202        integer_to_list(num_set_partitions()) ++ ")"),
203    DDoc = {[
204        {<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
205        {<<"json">>, {[
206        {<<"language">>, <<"javascript">>},
207        {<<"views">>, {[
208            {<<"view_1">>, {[
209                {<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>},
210                {<<"reduce">>, <<"_sum">>}
211            ]}}
212        ]}}
213        ]}}
214    ]},
215    ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
216    etap:diag("Configuring set view with partitions [0 .. 63] as active"),
217    Params = #set_view_params{
218        max_partitions = num_set_partitions(),
219        active_partitions = lists:seq(0, 63),
220        passive_partitions = [],
221        use_replica_index = false
222    },
223    ok = couch_set_view:define_group(
224        mapreduce_view, test_set_name(), ddoc_id(), Params).
225
226
227add_documents(StartId, Count) ->
228    add_documents(StartId, Count, fun(I) -> I end).
229
230add_documents(StartId, Count, ValueGenFun) ->
231    etap:diag("Adding " ++ integer_to_list(Count) ++ " new documents"),
232    DocList0 = lists:map(
233        fun(I) ->
234            {I rem num_set_partitions(), {[
235                {<<"meta">>, {[{<<"id">>, doc_id(I)}]}},
236                {<<"json">>, {[
237                    {<<"value">>, ValueGenFun(I)}
238                ]}}
239            ]}}
240        end,
241        lists:seq(StartId, StartId + Count - 1)),
242    DocList = [Doc || {_, Doc} <- lists:keysort(1, DocList0)],
243    ok = couch_set_view_test_util:populate_set_sequentially(
244        test_set_name(),
245        lists:seq(0, num_set_partitions() - 1),
246        DocList).
247
248
249update_documents(StartId, NumDocs, ValueGenFun) ->
250    etap:diag("About to update " ++ integer_to_list(NumDocs) ++ " documents"),
251    Dbs = dict:from_list(lists:map(
252        fun(I) ->
253            {ok, Db} = couch_set_view_test_util:open_set_db(test_set_name(), I),
254            {I, Db}
255        end,
256        lists:seq(0, num_set_partitions() - 1))),
257    Docs = lists:foldl(
258        fun(I, Acc) ->
259            Doc = couch_doc:from_json_obj({[
260                {<<"meta">>, {[{<<"id">>, doc_id(I)}]}},
261                {<<"json">>, {[
262                    {<<"value">>, ValueGenFun(I)}
263                ]}}
264            ]}),
265            DocList = case orddict:find(I rem num_set_partitions(), Acc) of
266            {ok, L} ->
267                L;
268            error ->
269                []
270            end,
271            orddict:store(I rem num_set_partitions(), [Doc | DocList], Acc)
272        end,
273        orddict:new(), lists:seq(StartId, StartId + NumDocs - 1)),
274    [] = orddict:fold(
275        fun(I, DocList, Acc) ->
276            Db = dict:fetch(I, Dbs),
277            ok = couch_db:update_docs(Db, DocList, [sort_docs]),
278            Acc
279        end,
280        [], Docs),
281    etap:diag("Updated " ++ integer_to_list(NumDocs) ++ " documents"),
282    ok = lists:foreach(fun({_, Db}) -> ok = couch_db:close(Db) end, dict:to_list(Dbs)).
283
284
285doc_id(I) ->
286    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
287
288
289fold_view(ActiveParts, ValueGenFun) ->
290    {ok, FoldView, Group, _} = couch_set_view:get_reduce_view(
291        test_set_name(), ddoc_id(), <<"view_1">>,
292        #set_view_group_req{stale = false, debug = true}),
293
294    etap:diag("Verifying some btree metadata"),
295    #set_view_group{
296        views = [#set_view{}],
297        index_header = #set_view_index_header{
298            seqs = HeaderUpdateSeqs,
299            abitmask = Abitmask,
300            pbitmask = Pbitmask,
301            cbitmask = Cbitmask
302        }
303    } = Group,
304
305    ExpectedABitmask = couch_set_view_util:build_bitmask(ActiveParts),
306    ExpectedPBitmask = couch_set_view_util:build_bitmask(
307        ordsets:subtract(lists:seq(0, num_set_partitions() - 1), ActiveParts)),
308    DbSeqsActive = couch_set_view_test_util:get_db_seqs(test_set_name(), ActiveParts),
309    ExpectedViewReduction = lists:sum(
310        [ValueGenFun(I) || I <- lists:seq(0, num_docs() - 1),
311            ordsets:is_element(I rem num_set_partitions(), ActiveParts)]),
312
313    etap:is(
314        [{P, S} || {P, S} <- HeaderUpdateSeqs, ordsets:is_element(P, ActiveParts)],
315        DbSeqsActive,
316        "Header has right active update seqs list"),
317    etap:is(Abitmask, ExpectedABitmask, "Header has right active bitmask"),
318    etap:is(Pbitmask, ExpectedPBitmask, "Header has right passive bitmask"),
319    etap:is(Cbitmask, 0, "Header has right cleanup bitmask"),
320
321    ViewArgs = #view_query_args{
322        run_reduce = true,
323        view_name = <<"view_1">>
324    },
325
326    etap:diag("Folding the view with ?group=false"),
327
328    FullRedFoldFun = fun(_Key, {json, Red}, Acc) ->
329        {ok, [Red | Acc]}
330    end,
331    {ok, FullRedResult} = couch_set_view:fold_reduce(
332        Group, FoldView, FullRedFoldFun, [], ViewArgs),
333    case ActiveParts of
334    [] ->
335        etap:is(
336            FullRedResult,
337            [],
338           "Got correct fold reduce value with ?group=false");
339    _ ->
340        etap:is(
341            ejson:decode(FullRedResult),
342            ExpectedViewReduction,
343            "Got correct fold reduce value with ?group=false")
344    end,
345
346    etap:diag("Folding the view with ?group=true"),
347
348    PerKeyRedFoldFun = fun(Key, {json, Red}, {NextVal, I}) ->
349        ExpectedKey = {json, ejson:encode(doc_id(NextVal))},
350        ExpectedRed = ValueGenFun(NextVal),
351        case {Key, ejson:decode(Red)} of
352        {ExpectedKey, ExpectedRed} ->
353            ok;
354        _ ->
355            etap:bail("Unexpected KV at view fold iteration " ++ integer_to_list(I))
356        end,
357        {ok, {next_val(NextVal, ActiveParts), I + 1}}
358    end,
359    {ok, {_, PerKeyRedResult}} = couch_set_view:fold_reduce(
360        Group, FoldView, PerKeyRedFoldFun,
361        {case ActiveParts of [] -> nil; _ -> hd(ActiveParts) end, 0},
362        ViewArgs#view_query_args{group_level = exact}),
363    etap:is(
364        PerKeyRedResult,
365        length(ActiveParts) * (num_docs() div num_set_partitions()),
366        "Got correct fold reduce value with ?group=true"),
367    ok.
368
369
370next_val(I, ActiveParts) ->
371    case ordsets:is_element((I + 1) rem num_set_partitions(), ActiveParts) of
372    true ->
373        I + 1;
374    false ->
375        next_val(I + 1, ActiveParts)
376    end.
377
378
379verify_btrees(ActiveParts, ValueGenFun) ->
380    Group = get_group_snapshot(),
381    etap:diag("Verifying btrees"),
382
383    #set_view_group{
384        id_btree = IdBtree,
385        views = [View1],
386        index_header = #set_view_index_header{
387            seqs = HeaderUpdateSeqs,
388            abitmask = Abitmask,
389            pbitmask = Pbitmask,
390            cbitmask = Cbitmask
391        }
392    } = Group,
393    #set_view{
394        indexer = #mapreduce_view{
395            btree = View1Btree
396        }
397    } = View1,
398    ExpectedBitmask = couch_set_view_util:build_bitmask(
399        lists:seq(0, num_set_partitions() - 1)),
400    ExpectedABitmask = couch_set_view_util:build_bitmask(ActiveParts),
401    ExpectedPBitmask = couch_set_view_util:build_bitmask(
402        ordsets:subtract(lists:seq(0, num_set_partitions() - 1), ActiveParts)),
403    DbSeqs = couch_set_view_test_util:get_db_seqs(
404        test_set_name(), lists:seq(0, num_set_partitions() - 1)),
405    ExpectedKVCount = num_docs(),
406    ExpectedBtreeViewReduction = lists:sum(
407        [ValueGenFun(I) || I <- lists:seq(0, num_docs() - 1)]),
408
409    etap:is(
410        couch_set_view_test_util:full_reduce_id_btree(Group, IdBtree),
411        {ok, {ExpectedKVCount, ExpectedBitmask}},
412        "Id Btree has the right reduce value"),
413    etap:is(
414        couch_set_view_test_util:full_reduce_view_btree(Group, View1Btree),
415        {ok, {ExpectedKVCount, [ExpectedBtreeViewReduction], ExpectedBitmask}},
416        "View1 Btree has the right reduce value"),
417
418    etap:is(HeaderUpdateSeqs, DbSeqs, "Header has right update seqs list"),
419    etap:is(Abitmask, ExpectedABitmask, "Header has right active bitmask"),
420    etap:is(Pbitmask, ExpectedPBitmask, "Header has right passive bitmask"),
421    etap:is(Cbitmask, 0, "Header has right cleanup bitmask"),
422
423    etap:diag("Verifying the Id Btree"),
424    MaxPerPart = num_docs() div num_set_partitions(),
425    {ok, _, {_, _, _, IdBtreeFoldResult}} = couch_set_view_test_util:fold_id_btree(
426        Group,
427        IdBtree,
428        fun(Kv, _, {P0, I0, C0, It}) ->
429            case C0 >= MaxPerPart of
430            true ->
431                P = P0 + 1,
432                I = P,
433                C = 1;
434            false ->
435                P = P0,
436                I = I0,
437                C = C0 + 1
438            end,
439            true = (P < num_set_partitions()),
440            DocId = doc_id(I),
441            Value = [{View1#set_view.id_num, DocId}],
442            ExpectedKv = {<<P:16, DocId/binary>>, {P, Value}},
443            case ExpectedKv =:= Kv of
444            true ->
445                ok;
446            false ->
447                etap:bail("Id Btree has an unexpected KV at iteration " ++ integer_to_list(It))
448            end,
449            {ok, {P, I + num_set_partitions(), C, It + 1}}
450        end,
451        {0, 0, 0, 0}, []),
452    etap:is(IdBtreeFoldResult, ExpectedKVCount,
453        "Id Btree has " ++ integer_to_list(ExpectedKVCount) ++ " entries"),
454
455    etap:diag("Verifying the View1 Btree"),
456    {ok, _, View1BtreeFoldResult} = couch_set_view_test_util:fold_view_btree(
457        Group,
458        View1Btree,
459        fun(Kv, _, I) ->
460            PartId = I rem num_set_partitions(),
461            DocId = doc_id(I),
462            ExpectedKv = {{DocId, DocId}, {PartId, ValueGenFun(I)}},
463            case ExpectedKv =:= Kv of
464            true ->
465                ok;
466            false ->
467                etap:bail("View1 Btree has an unexpected KV at iteration " ++ integer_to_list(I))
468            end,
469            {ok, I + 1}
470        end,
471        0, []),
472    etap:is(View1BtreeFoldResult, ExpectedKVCount,
473        "View1 Btree has " ++ integer_to_list(ExpectedKVCount) ++ " entries"),
474    ok.
475
476
477get_group_snapshot() ->
478    GroupPid = couch_set_view:get_group_pid(
479        mapreduce_view, test_set_name(), ddoc_id(), prod),
480    {ok, Group, 0} = gen_server:call(
481        GroupPid, #set_view_group_req{stale = false, debug = true}, infinity),
482    Group.
483
484
485wait_updater_finishes() ->
486    GroupPid = couch_set_view:get_group_pid(
487        mapreduce_view, test_set_name(), ddoc_id(), prod),
488    {ok, UpPid} = gen_server:call(GroupPid, updater_pid, infinity),
489    case UpPid of
490    nil ->
491        ok;
492    _ when is_pid(UpPid) ->
493        Ref = erlang:monitor(process, UpPid),
494        receive
495        {'DOWN', Ref, process, UpPid, {updater_finished, _}} ->
496            ok;
497        {'DOWN', Ref, process, UpPid, noproc} ->
498            ok;
499        {'DOWN', Ref, process, UpPid, Reason} ->
500            etap:bail("Failure updating main group: " ++ couch_util:to_list(Reason))
501        after ?MAX_WAIT_TIME ->
502            etap:bail("Timeout waiting for main group update")
503        end
504    end.
505
506
507compact_view_group() ->
508    {ok, CompactPid} = couch_set_view_compactor:start_compact(
509        mapreduce_view, test_set_name(), ddoc_id(), main),
510    etap:diag("Waiting for main view group compaction to finish"),
511    Ref = erlang:monitor(process, CompactPid),
512    receive
513    {'DOWN', Ref, process, CompactPid, normal} ->
514        ok;
515    {'DOWN', Ref, process, CompactPid, Reason} ->
516        etap:bail("Failure compacting main view group: " ++ couch_util:to_list(Reason))
517    after ?MAX_WAIT_TIME ->
518        etap:bail("Timeout waiting for main view group compaction to finish")
519    end.
520