1#!/usr/bin/env escript
2%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-define(MAX_WAIT_TIME, 600 * 1000).
18
19-include_lib("couch_set_view/include/couch_set_view.hrl").
20
21test_set_name() -> <<"couch_test_set_index_replica_compact">>.
22num_set_partitions() -> 64.
23ddoc_id() -> <<"_design/test">>.
24num_docs() -> 24128.
25
26
27main(_) ->
28    test_util:init_code_path(),
29
30    etap:plan(26),
31    case (catch test()) of
32        ok ->
33            etap:end_tests();
34        Other ->
35            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
36            etap:bail(Other)
37    end,
38    ok.
39
40
41test() ->
42    couch_set_view_test_util:start_server(test_set_name()),
43
44    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
45    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
46
47    populate_set(),
48
49    etap:diag("Marking partitions [ 8 .. 63 ] as replicas"),
50    ok = couch_set_view:add_replica_partitions(
51        mapreduce_view, test_set_name(), ddoc_id(), lists:seq(8, 63)),
52
53    verify_group_info_before_replica_removal(),
54    wait_for_replica_full_update(),
55    verify_group_info_before_replica_removal(),
56
57    etap:diag("Removing partitions [ 8 .. 63 ] from replica set"),
58    ok = couch_set_view:remove_replica_partitions(
59        mapreduce_view, test_set_name(), ddoc_id(), lists:seq(8, 63)),
60    verify_group_info_after_replica_removal(),
61
62    DiskSizeBefore = replica_index_disk_size(),
63
64    {MainGroupBefore, RepGroupBefore} = get_group_snapshots(),
65
66    etap:diag("Trigerring replica group compaction"),
67    {ok, CompactPid} = couch_set_view_compactor:start_compact(
68        mapreduce_view, test_set_name(), ddoc_id(), replica),
69    Ref = erlang:monitor(process, CompactPid),
70    etap:diag("Waiting for replica group compaction to finish"),
71    receive
72    {'DOWN', Ref, process, CompactPid, normal} ->
73        ok;
74    {'DOWN', Ref, process, CompactPid, noproc} ->
75        ok;
76    {'DOWN', Ref, process, CompactPid, Reason} ->
77        etap:bail("Failure compacting replica group: " ++ couch_util:to_list(Reason))
78    after ?MAX_WAIT_TIME ->
79        etap:bail("Timeout waiting for replica group compaction to finish")
80    end,
81
82    {MainGroupAfter, RepGroupAfter} = get_group_snapshots(),
83
84    etap:is(
85        MainGroupAfter#set_view_group.ref_counter,
86        MainGroupBefore#set_view_group.ref_counter,
87        "Same ref counter for main group after replica compaction"),
88    etap:is(
89        MainGroupAfter#set_view_group.fd,
90        MainGroupBefore#set_view_group.fd,
91        "Same fd for main group after replica compaction"),
92
93    etap:is(
94        is_process_alive(MainGroupBefore#set_view_group.ref_counter),
95        true,
96        "Main group's ref counter still alive"),
97    etap:is(
98        is_process_alive(MainGroupBefore#set_view_group.fd),
99        true,
100        "Main group's fd still alive"),
101
102    etap:is(
103        couch_ref_counter:count(MainGroupAfter#set_view_group.ref_counter),
104        1,
105        "Main group's ref counter count is 1"),
106
107    etap:isnt(
108        RepGroupAfter#set_view_group.ref_counter,
109        RepGroupBefore#set_view_group.ref_counter,
110        "Different ref counter for replica group after replica compaction"),
111    etap:isnt(
112        RepGroupAfter#set_view_group.fd,
113        RepGroupBefore#set_view_group.fd,
114        "Different fd for replica group after replica compaction"),
115
116    etap:is(
117        is_process_alive(RepGroupBefore#set_view_group.ref_counter),
118        false,
119        "Old replica group ref counter is dead"),
120
121    etap:is(
122        is_process_alive(RepGroupBefore#set_view_group.fd),
123        false,
124        "Old replica group fd is dead"),
125
126    etap:is(
127        couch_ref_counter:count(RepGroupAfter#set_view_group.ref_counter),
128        1,
129        "Replica group's new ref counter count is 1"),
130
131    RepGroupInfo = get_replica_group_info(),
132    {Stats} = couch_util:get_value(stats, RepGroupInfo),
133    etap:is(couch_util:get_value(compactions, Stats), 1, "Replica had 1 full compaction in stats"),
134    etap:is(couch_util:get_value(cleanups, Stats), 1, "Replica had 1 full cleanup in stats"),
135    verify_group_info_after_replica_compact(),
136
137    DiskSizeAfter = replica_index_disk_size(),
138    etap:is(DiskSizeAfter < DiskSizeBefore, true, "Index file size is smaller after compaction"),
139
140    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
141    couch_set_view_test_util:stop_server(),
142    ok.
143
144
145get_group_snapshots() ->
146    GroupPid = couch_set_view:get_group_pid(
147        mapreduce_view, test_set_name(), ddoc_id(), prod),
148    {ok, MainGroup, 0} = gen_server:call(
149        GroupPid,
150        #set_view_group_req{stale = false, debug = true},
151        infinity),
152    {ok, RepGroup, 0} = gen_server:call(
153        MainGroup#set_view_group.replica_pid,
154        #set_view_group_req{stale = false, debug = true},
155        infinity),
156    couch_ref_counter:drop(MainGroup#set_view_group.ref_counter),
157    couch_ref_counter:drop(RepGroup#set_view_group.ref_counter),
158    {MainGroup, RepGroup}.
159
160
161verify_group_info_before_replica_removal() ->
162    etap:diag("Verifying replica group info before removing replica partitions"),
163    RepGroupInfo = get_replica_group_info(),
164    etap:is(
165        couch_util:get_value(active_partitions, RepGroupInfo),
166        [],
167        "Replica group has [ ] as active partitions"),
168    etap:is(
169        couch_util:get_value(passive_partitions, RepGroupInfo),
170        lists:seq(8, 63),
171        "Replica group has [ 8 .. 63 ] as passive partitions"),
172    etap:is(
173        couch_util:get_value(cleanup_partitions, RepGroupInfo),
174        [],
175        "Replica group has [ ] as cleanup partitions").
176
177
178verify_group_info_after_replica_removal() ->
179    etap:diag("Verifying replica group info after removing replica partitions"),
180    RepGroupInfo = get_replica_group_info(),
181    etap:is(
182        couch_util:get_value(active_partitions, RepGroupInfo),
183        [],
184        "Replica group has [ ] as active partitions"),
185    etap:is(
186        couch_util:get_value(passive_partitions, RepGroupInfo),
187        [],
188        "Replica group has [ ] as passive partitions"),
189    CleanupParts = couch_util:get_value(cleanup_partitions, RepGroupInfo),
190    {Stats} = couch_util:get_value(stats, RepGroupInfo),
191    CleanupHist = couch_util:get_value(cleanup_history, Stats),
192    case length(CleanupHist) > 0 of
193    true ->
194        etap:is(
195            length(CleanupParts),
196            0,
197            "Replica group has a right value for cleanup partitions");
198    false ->
199        etap:is(
200            length(CleanupParts) > 0,
201            true,
202           "Replica group has a right value for cleanup partitions")
203    end,
204    etap:is(
205        ordsets:intersection(CleanupParts, lists:seq(0, 7)),
206        [],
207        "Replica group doesn't have any cleanup partition with ID in [ 0 .. 7 ]").
208
209
210verify_group_info_after_replica_compact() ->
211    etap:diag("Verifying replica group info after compaction"),
212    RepGroupInfo = get_replica_group_info(),
213    etap:is(
214        couch_util:get_value(active_partitions, RepGroupInfo),
215        [],
216        "Replica group has [ ] as active partitions"),
217    etap:is(
218        couch_util:get_value(passive_partitions, RepGroupInfo),
219        [],
220        "Replica group has [ ] as passive partitions"),
221    etap:is(
222        couch_util:get_value(cleanup_partitions, RepGroupInfo),
223        [],
224        "Replica group has [ ] as cleanup partitions").
225
226
227wait_for_replica_full_update() ->
228    etap:diag("Waiting for a full replica group update"),
229    UpdateCountBefore = get_replica_updates_count(),
230    MainGroupPid = couch_set_view:get_group_pid(
231        mapreduce_view, test_set_name(), ddoc_id(), prod),
232    {ok, ReplicaGroupPid} = gen_server:call(MainGroupPid, replica_pid, infinity),
233    {ok, UpPid} = gen_server:call(ReplicaGroupPid, {start_updater, []}, infinity),
234    case is_pid(UpPid) of
235    true ->
236        ok;
237    false ->
238        etap:bail("Updater was not triggered")
239    end,
240    Ref = erlang:monitor(process, UpPid),
241    receive
242    {'DOWN', Ref, process, UpPid, {updater_finished, _}} ->
243        ok;
244    {'DOWN', Ref, process, UpPid, noproc} ->
245        ok;
246    {'DOWN', Ref, process, UpPid, Reason} ->
247        etap:bail("Failure updating replica group: " ++ couch_util:to_list(Reason))
248    after ?MAX_WAIT_TIME ->
249        etap:bail("Timeout waiting for replica group update")
250    end,
251    UpdateCountAfter = get_replica_updates_count(),
252    case UpdateCountAfter == (UpdateCountBefore + 1) of
253    true ->
254        ok;
255    false ->
256        etap:bail("Updater was not triggered")
257    end.
258
259
260get_replica_updates_count() ->
261    get_replica_updates_count(get_replica_group_info()).
262
263
264get_replica_updates_count(RepGroupInfo) ->
265    {Stats} = couch_util:get_value(stats, RepGroupInfo),
266    Updates = couch_util:get_value(full_updates, Stats),
267    true = is_integer(Updates),
268    Updates.
269
270
271get_replica_group_info() ->
272    {ok, MainInfo} = couch_set_view:get_group_info(
273        mapreduce_view, test_set_name(), ddoc_id(), prod),
274    {RepInfo} = couch_util:get_value(replica_group_info, MainInfo),
275    RepInfo.
276
277
278replica_index_disk_size() ->
279    Info = get_replica_group_info(),
280    Size = couch_util:get_value(disk_size, Info),
281    true = is_integer(Size),
282    true = (Size >= 0),
283    Size.
284
285
286populate_set() ->
287    couch_set_view:cleanup_index_files(mapreduce_view, test_set_name()),
288    etap:diag("Populating the " ++ integer_to_list(num_set_partitions()) ++
289        " databases with " ++ integer_to_list(num_docs()) ++ " documents"),
290    DDoc = {[
291        {<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
292        {<<"json">>, {[
293        {<<"language">>, <<"javascript">>},
294        {<<"views">>, {[
295            {<<"test">>, {[
296                {<<"map">>, <<"function(doc, meta) { emit(meta.id, null); }">>},
297                {<<"reduce">>, <<"_count">>}
298            ]}}
299        ]}}
300        ]}}
301    ]},
302    ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
303    DocList = lists:map(
304        fun(I) ->
305            {[
306                {<<"meta">>, {[{<<"id">>, iolist_to_binary(["doc", integer_to_list(I)])}]}},
307                {<<"json">>, {[
308                    {<<"value">>, I}
309                ]}}
310            ]}
311        end,
312        lists:seq(1, num_docs())),
313    ok = couch_set_view_test_util:populate_set_sequentially(
314        test_set_name(),
315        lists:seq(0, num_set_partitions() - 1),
316        DocList),
317    etap:diag("Configuring set view with partitions [0 .. 7] as active"),
318    Params = #set_view_params{
319        max_partitions = num_set_partitions(),
320        active_partitions = lists:seq(0, 7),
321        passive_partitions = [],
322        use_replica_index = true
323    },
324    ok = couch_set_view:define_group(
325        mapreduce_view, test_set_name(), ddoc_id(), Params).
326