1#!/usr/bin/env escript
2%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-define(JSON_ENCODE(V), ejson:encode(V)). % couch_db.hrl
18-define(MAX_WAIT_TIME, 900 * 1000).
19
20-include_lib("couch_set_view/include/couch_set_view.hrl").
21
22test_set_name() -> <<"couch_test_set_index_updater_add_more_passive_partitions">>.
23num_set_partitions() -> 64.
24ddoc_id() -> <<"_design/test">>.
25num_docs_0() -> 78144.  % keep it a multiple of num_set_partitions()
26
27
28main(_) ->
29    test_util:init_code_path(),
30
31    % If the Erlang version is < 17, then it's returned as 0
32    Version = try
33        list_to_integer(erlang:system_info(otp_release))
34    catch error:badarg ->
35        0
36    end,
37    % Erlang < 17.0 on Windows doesn't support long file paths, hence disable
38    % that test on that platform.
39    case Version < 17 andalso element(1, os:type()) =:= win32 of
40    true ->
41        etap:plan(skip);
42    false ->
43        etap:plan(40),
44        case (catch test()) of
45            ok ->
46                etap:end_tests();
47            Other ->
48                io:format(standard_error, "Test died abnormally: ~p", [Other]),
49                etap:bail(Other)
50        end
51    end,
52    ok.
53
54
55test() ->
56    couch_set_view_test_util:start_server(test_set_name()),
57
58    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
59    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
60
61    create_set(),
62
63    ValueGenFun1 = fun(I) -> I end,
64    update_documents(0, num_docs_0(), ValueGenFun1),
65
66    etap:diag("Testing state transitions during initial index build"),
67    test_state_changes_while_updater_running(ValueGenFun1, num_docs_0()),
68
69    ok = couch_set_view:set_partition_states(
70        mapreduce_view, test_set_name(), ddoc_id(),
71        [],
72        lists:seq(0, (num_set_partitions() div 2) - 1),
73        lists:seq(num_set_partitions() div 2, num_set_partitions() - 1)),
74
75    ValueGenFun2 = fun(I) -> I * 3 end,
76    NumDocs2 = num_docs_0() + (5 * num_set_partitions()),
77    update_documents(0, NumDocs2, ValueGenFun2),
78
79    etap:diag("Testing state transitions during incremental index update"),
80    {ActiveParts, PassiveParts} =
81        test_state_changes_while_updater_running(ValueGenFun2, NumDocs2),
82
83    % Test adding a new passive partition to the updater while it's running and
84    % that partition ends up in the pending transition.
85    % This failed to happen in MB-8109.
86    test_add_passive_partition_to_pending_transition_while_updater_running(
87        ActiveParts, PassiveParts, NumDocs2, ValueGenFun1),
88
89    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
90    ok = timer:sleep(1000),
91    couch_set_view_test_util:stop_server(),
92    ok.
93
94
95test_state_changes_while_updater_running(ValueGenFun, NumDocs) ->
96    GroupPid = couch_set_view:get_group_pid(
97        mapreduce_view, test_set_name(), ddoc_id(), prod),
98    NewPassiveParts = lists:seq(num_set_partitions() div 2, num_set_partitions() - 1),
99
100    % Trigger index update
101    {ok, UpdaterPid} = gen_server:call(GroupPid, {start_updater, [pause]}, infinity),
102    Ref = erlang:monitor(process, UpdaterPid),
103
104    lists:foreach(
105        fun(PartId) ->
106            ok = couch_set_view:set_partition_states(
107                mapreduce_view, test_set_name(), ddoc_id(), [], [PartId], [])
108        end,
109        NewPassiveParts),
110
111    etap:diag("Added more passive partitions while updater is running"),
112
113    ActiveParts0 = lists:seq(1, num_set_partitions() - 1, 2),
114    PassiveParts0 = lists:seq(0, num_set_partitions() - 1, 2),
115
116    etap:diag("Changing the state of some partitions from passive to active " ++
117                  "while the updater is running"),
118    lists:foreach(
119        fun(PartId) ->
120            ok = couch_set_view:set_partition_states(
121                mapreduce_view, test_set_name(), ddoc_id(), [PartId], [], [])
122        end,
123        ActiveParts0),
124
125    PassiveParts = ordsets:union(PassiveParts0, lists:seq(1, num_set_partitions() - 1, 4)),
126    ActiveParts = ordsets:subtract(ActiveParts0, PassiveParts),
127
128    etap:diag("Changing the state of some partitions from active back to passive " ++
129                  "while the updater is running"),
130    lists:foreach(
131        fun(PartId) ->
132            ok = couch_set_view:set_partition_states(
133                mapreduce_view, test_set_name(), ddoc_id(), [], [PartId], [])
134        end,
135        PassiveParts),
136
137    UpdaterPid ! continue,
138    % Wait for initial indexing to finish
139    {ok, _, _} = gen_server:call(
140        GroupPid, #set_view_group_req{stale = false, debug = true}, infinity),
141    verify_btrees(ValueGenFun, NumDocs, ActiveParts, PassiveParts),
142
143    etap:diag("Shutting down group pid, and verifying last written header is good"),
144    couch_util:shutdown_sync(GroupPid),
145    wait_group_respawn(GroupPid, 3000),
146
147    verify_btrees(ValueGenFun, NumDocs, ActiveParts, PassiveParts),
148    {ActiveParts, PassiveParts}.
149
150
151test_add_passive_partition_to_pending_transition_while_updater_running(
152        Active, Passive, NumDocs, ValueGenFun) ->
153    GroupPid = couch_set_view:get_group_pid(
154        mapreduce_view, test_set_name(), ddoc_id(), prod),
155    ok = gen_server:call(GroupPid, {set_auto_cleanup, false}, infinity),
156    Parts = ordsets:from_list([hd(Active), hd(Passive)]),
157    ok = couch_set_view:set_partition_states(
158        mapreduce_view, test_set_name(), ddoc_id(), [], [], Parts),
159
160    % Bump partition seq numbers so that the updater can be triggered.
161    update_documents(0, NumDocs, ValueGenFun),
162    {ok, UpdaterPid} = gen_server:call(GroupPid, {start_updater, [pause]}, infinity),
163
164    % Now mark partitions under cleanup as passive while the updater is running
165    ok = couch_set_view:set_partition_states(
166        mapreduce_view, test_set_name(), ddoc_id(), [], Parts, []),
167
168    Group = get_group_snapshot(),
169    PendingTrans = ?set_pending_transition(Group),
170    PendingPassive = ?pending_transition_passive(PendingTrans),
171
172    etap:is(PendingPassive, Parts,
173            "Correct list of passive partitions in pending transition"),
174
175    Ref = erlang:monitor(process, UpdaterPid),
176    UpdaterPid ! continue,
177    etap:diag("Waiting for updater to finish"),
178    receive
179    {'DOWN', Ref, _, _, {updater_finished, _}} ->
180        etap:diag("Updater finished");
181    {'DOWN', Ref, _, _, {updater_error, shutdown}} ->
182        etap:diag("Updater restarted");
183    {'DOWN', Ref, _, _, Reason} ->
184        etap:bail("Updater finished with unexpected reason: " ++ couch_util:to_list(Reason))
185    after ?MAX_WAIT_TIME ->
186        etap:bail("Timeout waiting for updater to finish")
187    end,
188
189    Group2 = get_group_snapshot(),
190    PendingTrans2 = ?set_pending_transition(Group2),
191    etap:is(PendingTrans2, nil, "Pending transition is empty after updater finished"),
192
193    PartsMask = couch_set_view_util:build_bitmask(Parts),
194    PartSeqs = [couch_util:get_value(P, ?set_seqs(Group2), -1) || P <- Parts],
195
196    etap:is(PartSeqs, [0 || _P <- Parts],
197            "Group snapshot has the right seq numbers for the new passive partitions"),
198    etap:is((?set_pbitmask(Group2) band PartsMask),
199            PartsMask,
200            "Group snapshot has new passive partitions set in its passive bitmask"),
201    ok.
202
203
204wait_group_respawn(_OldPid, T) when T =< 0 ->
205    etap:bail("Timeout waiting for group respawn");
206wait_group_respawn(OldPid, T) ->
207    NewPid = couch_set_view:get_group_pid(
208        mapreduce_view, test_set_name(), ddoc_id(), prod),
209    case NewPid of
210    OldPid ->
211        ok = timer:sleep(20),
212        wait_group_respawn(OldPid, T - 20);
213    _ ->
214        ok
215    end.
216
217
218get_group_snapshot() ->
219    GroupPid = couch_set_view:get_group_pid(
220        mapreduce_view, test_set_name(), ddoc_id(), prod),
221    {ok, Group} = gen_server:call(GroupPid, request_group, infinity),
222    Group.
223
224
225create_set() ->
226    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
227    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
228    couch_set_view:cleanup_index_files(mapreduce_view, test_set_name()),
229    etap:diag("Creating the set databases (# of partitions: " ++
230        integer_to_list(num_set_partitions()) ++ ")"),
231    DDoc = {[
232        {<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
233        {<<"json">>, {[
234        {<<"language">>, <<"javascript">>},
235        {<<"views">>, {[
236            {<<"view_1">>, {[
237                {<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>},
238                {<<"reduce">>, <<"_count">>}
239            ]}},
240            {<<"view_2">>, {[
241                {<<"map">>, <<"function(doc, meta) { emit(meta.id, meta.id); }">>},
242                {<<"reduce">>, <<"_count">>}
243            ]}}
244        ]}}
245        ]}}
246    ]},
247    ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
248    etap:diag("Configuring set view with partitions [0 .. 31] as passive"),
249    Params = #set_view_params{
250        max_partitions = num_set_partitions(),
251        active_partitions = [],
252        passive_partitions = lists:seq(0, (num_set_partitions() div 2) - 1),
253        use_replica_index = false
254    },
255    ok = couch_set_view:define_group(
256        mapreduce_view, test_set_name(), ddoc_id(), Params).
257
258
259update_documents(StartId, Count, ValueGenFun) ->
260    etap:diag("Updating " ++ integer_to_list(Count) ++ " documents"),
261    DocList0 = lists:map(
262        fun(I) ->
263            {I rem num_set_partitions(), {[
264                {<<"meta">>, {[{<<"id">>, doc_id(I)}]}},
265                {<<"json">>, {[
266                    {<<"value">>, ValueGenFun(I)}
267                ]}}
268            ]}}
269        end,
270        lists:seq(StartId, StartId + Count - 1)),
271    DocList = [Doc || {_, Doc} <- lists:keysort(1, DocList0)],
272    ok = couch_set_view_test_util:populate_set_sequentially(
273        test_set_name(),
274        lists:seq(0, num_set_partitions() - 1),
275        DocList).
276
277
278doc_id(I) ->
279    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
280
281
282verify_btrees(ValueGenFun, NumDocs, ActiveParts, PassiveParts) ->
283    Group = get_group_snapshot(),
284    #set_view_group{
285        id_btree = IdBtree,
286        views = [View1, View2],
287        index_header = #set_view_index_header{
288            seqs = HeaderUpdateSeqs,
289            abitmask = Abitmask,
290            pbitmask = Pbitmask,
291            cbitmask = Cbitmask
292        }
293    } = Group,
294    #set_view{
295        indexer = #mapreduce_view{
296            btree = View1Btree
297        }
298    } = View1,
299    #set_view{
300        indexer = #mapreduce_view{
301            btree = View2Btree
302        }
303    } = View2,
304    AllParts = ordsets:union(ActiveParts, PassiveParts),
305    ExpectedActiveBitmask = couch_set_view_util:build_bitmask(ActiveParts),
306    ExpectedPassiveBitmask = couch_set_view_util:build_bitmask(PassiveParts),
307    ExpectedIndexedBitmask = ExpectedActiveBitmask bor ExpectedPassiveBitmask,
308    DbSeqs = couch_set_view_test_util:get_db_seqs(test_set_name(), AllParts),
309    ExpectedKVCount = length([I || I <- lists:seq(0, NumDocs - 1),
310        ordsets:is_element((I rem num_set_partitions()), AllParts)]),
311
312    etap:is(
313        couch_set_view_test_util:full_reduce_id_btree(Group, IdBtree),
314        {ok, {ExpectedKVCount, ExpectedIndexedBitmask}},
315        "Id Btree has the right reduce value"),
316    etap:is(
317        couch_set_view_test_util:full_reduce_view_btree(Group, View1Btree),
318        {ok, {ExpectedKVCount, [ExpectedKVCount], ExpectedIndexedBitmask}},
319        "View1 Btree has the right reduce value"),
320
321    etap:is(HeaderUpdateSeqs, DbSeqs, "Header has right update seqs list"),
322    etap:is(Abitmask, ExpectedActiveBitmask, "Header has right active bitmask"),
323    etap:is(Pbitmask, ExpectedPassiveBitmask, "Header has right passive bitmask"),
324    etap:is(Cbitmask, 0, "Header has right cleanup bitmask"),
325
326    etap:diag("Verifying the Id Btree"),
327    MaxPerPart = NumDocs div num_set_partitions(),
328    {ok, _, {_, _, _, IdBtreeFoldResult}} = couch_set_view_test_util:fold_id_btree(
329        Group,
330        IdBtree,
331        fun(Kv, _, {Parts, I0, C0, It}) ->
332            case C0 >= MaxPerPart of
333            true ->
334                [_ | RestParts] = Parts,
335                [P | _] = RestParts,
336                I = P,
337                C = 1;
338            false ->
339                RestParts = Parts,
340                [P | _] = RestParts,
341                I = I0,
342                C = C0 + 1
343            end,
344            true = (P < num_set_partitions()),
345            DocId = doc_id(I),
346            Value = [{View1#set_view.id_num, DocId}, {View2#set_view.id_num, DocId}],
347            ExpectedKv = {<<P:16, DocId/binary>>, {P, Value}},
348            case ExpectedKv =:= Kv of
349            true ->
350                ok;
351            false ->
352                etap:bail("Id Btree has an unexpected KV at iteration " ++ integer_to_list(It))
353            end,
354            {ok, {RestParts, I + num_set_partitions(), C, It + 1}}
355        end,
356        {AllParts, hd(AllParts), 0, 0}, []),
357    etap:is(IdBtreeFoldResult, ExpectedKVCount,
358        "Id Btree has " ++ integer_to_list(ExpectedKVCount) ++ " entries"),
359
360    etap:diag("Verifying the View1 Btree"),
361    {ok, _, {_, View1BtreeFoldResult}} = couch_set_view_test_util:fold_view_btree(
362        Group,
363        View1Btree,
364        fun(Kv, _, {I, Count}) ->
365            PartId = I rem num_set_partitions(),
366            DocId = doc_id(I),
367            ExpectedKv = {{DocId, DocId}, {PartId, ValueGenFun(I)}},
368            case ExpectedKv =:= Kv of
369            true ->
370                ok;
371            false ->
372                etap:bail("View1 Btree has an unexpected KV at iteration " ++ integer_to_list(Count + 1))
373            end,
374            {ok, {next_i(I, AllParts), Count + 1}}
375        end,
376        {hd(AllParts), 0}, []),
377    etap:is(View1BtreeFoldResult, ExpectedKVCount,
378        "View1 Btree has " ++ integer_to_list(ExpectedKVCount) ++ " entries"),
379
380    etap:diag("Verifying the View2 Btree"),
381    {ok, _, {_, View2BtreeFoldResult}} = couch_set_view_test_util:fold_view_btree(
382        Group,
383        View2Btree,
384        fun(Kv, _, {I, Count}) ->
385            PartId = I rem num_set_partitions(),
386            DocId = doc_id(I),
387            ExpectedKv = {{DocId, DocId}, {PartId, DocId}},
388            case ExpectedKv =:= Kv of
389            true ->
390                ok;
391            false ->
392                etap:bail("View2 Btree has an unexpected KV at iteration " ++ integer_to_list(Count + 1))
393            end,
394            {ok, {next_i(I, AllParts), Count + 1}}
395        end,
396        {hd(AllParts), 0}, []),
397    etap:is(View2BtreeFoldResult, ExpectedKVCount,
398        "View2 Btree has " ++ integer_to_list(ExpectedKVCount) ++ " entries"),
399    ok.
400
401
402next_i(I, Parts) ->
403    case ordsets:is_element((I + 1) rem num_set_partitions(), Parts) of
404    true ->
405        I + 1;
406    false ->
407        next_i(I + 1, Parts)
408    end.
409