1#!/usr/bin/env escript
2%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_set_view/include/couch_set_view.hrl").
18
19test_set_name() -> <<"couch_test_set_monitor_partition_updates">>.
20num_set_partitions() -> 64.
21ddoc_id() -> <<"_design/test">>.
22num_docs() -> 8000.  % keep it a multiple of num_set_partitions()
23
24-define(etap_match(Got, Expected, Desc),
25        etap:fun_is(fun(XXXXXX) ->
26            case XXXXXX of Expected -> true; _ -> false end
27        end, Got, Desc)).
28
29
30main(_) ->
31    test_util:init_code_path(),
32
33    etap:plan(8),
34    case (catch test()) of
35        ok ->
36            etap:end_tests();
37        Other ->
38            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
39            etap:bail(Other)
40    end,
41    ok.
42
43
44test() ->
45    couch_set_view_test_util:start_server(test_set_name()),
46
47    create_set(),
48    ValueGenFun1 = fun(I) -> I end,
49    update_documents(0, num_docs(), ValueGenFun1),
50
51    % Disable automatic updates when there are clients monitoring partition updates
52    GroupPid = couch_set_view:get_group_pid(
53        mapreduce_view, test_set_name(), ddoc_id(), prod),
54    ok = gen_server:call(GroupPid, {set_timeout, infinity}, infinity),
55
56    Ref1 = couch_set_view:monitor_partition_update(
57        mapreduce_view, test_set_name(), ddoc_id(), 10),
58    Msg1 = receive
59    {Ref1, _} ->
60        ok
61    after 1000 ->
62        undefined
63    end,
64    etap:is(Msg1, undefined, "Didn't got any partition 10 updated notification"),
65
66    Ref2 = couch_set_view:monitor_partition_update(
67        mapreduce_view, test_set_name(), ddoc_id(), 33),
68    Msg2 = receive
69    {Ref2, _} ->
70        ok
71    after 1000 ->
72        undefined
73    end,
74    etap:is(Msg2, undefined, "Didn't got any partition 33 updated notification"),
75
76    Ref3 = try
77        couch_set_view:monitor_partition_update(
78            mapreduce_view, test_set_name(), ddoc_id(), 60)
79    catch throw:Error ->
80        Error
81    end,
82
83    ?etap_match(Ref3, {error, _},
84                "Got error when asking to monitor partition 60 update"
85                " (not in active nor passive set)"),
86
87    % build index
88    _ = get_group_snapshot(),
89
90    GotPart10Notify = receive
91    {Ref1, updated} ->
92        true
93    after 5000 ->
94        false
95    end,
96    etap:is(GotPart10Notify, true, "Got update notification for partition 10"),
97
98    GotPart33Notify = receive
99    {Ref2, updated} ->
100        true
101    after 5000 ->
102        false
103    end,
104    etap:is(GotPart33Notify, true, "Got update notification for partition 33"),
105
106    Ref4 = try
107        couch_set_view:monitor_partition_update(
108            mapreduce_view, test_set_name(), ddoc_id(), 127)
109    catch throw:Error2 ->
110        Error2
111    end,
112
113    wait_updater_finished(),
114
115    ?etap_match(Ref4, {error, _},
116                "Got error when asking to monitor partition 127 update"
117                " (not in range 0 .. 63)"),
118
119    Ref5 = couch_set_view:monitor_partition_update(
120        mapreduce_view, test_set_name(), ddoc_id(), 12),
121    GotPart12ImmediateNotification = receive
122    {Ref5, updated} ->
123        true
124    after 0 ->
125        false
126    end,
127    etap:is(GotPart12ImmediateNotification, true,
128            "Got immediate notification for partition 12"),
129
130    update_documents(num_docs(), num_set_partitions(), ValueGenFun1),
131
132    Ref6 = couch_set_view:monitor_partition_update(
133        mapreduce_view, test_set_name(), ddoc_id(), 20),
134    ok = couch_set_view:set_partition_states(
135           mapreduce_view, test_set_name(), ddoc_id(), [], [], [20]),
136
137    GotPart20CleanupNotify = receive
138    {Ref6, marked_for_cleanup} ->
139        true
140    after 3000 ->
141        false
142    end,
143    etap:is(GotPart20CleanupNotify, true, "Got cleanup notification for partition 20"),
144
145    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
146    couch_set_view_test_util:stop_server(),
147    ok.
148
149
150get_group_snapshot() ->
151    GroupPid = couch_set_view:get_group_pid(
152        mapreduce_view, test_set_name(), ddoc_id(), prod),
153    {ok, Group, 0} = gen_server:call(
154        GroupPid, #set_view_group_req{stale = false}, infinity),
155    Group.
156
157
158create_set() ->
159    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
160    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
161    couch_set_view:cleanup_index_files(mapreduce_view, test_set_name()),
162    etap:diag("Creating the set databases (# of partitions: " ++
163        integer_to_list(num_set_partitions()) ++ ")"),
164    DDoc = {[
165        {<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
166        {<<"json">>, {[
167        {<<"language">>, <<"javascript">>},
168        {<<"views">>, {[
169            {<<"view_1">>, {[
170                {<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>},
171                {<<"reduce">>, <<"_sum">>}
172            ]}}
173        ]}}
174        ]}}
175    ]},
176    ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
177    etap:diag("Configuring set view with partitions [0 .. 31]"
178              " as active and [32 .. 47] as passive"),
179    Params = #set_view_params{
180        max_partitions = num_set_partitions(),
181        active_partitions = lists:seq(0, 31),
182        passive_partitions = lists:seq(32, 47),
183        use_replica_index = false
184    },
185    ok = couch_set_view:define_group(
186        mapreduce_view, test_set_name(), ddoc_id(), Params).
187
188
189update_documents(StartId, Count, ValueGenFun) ->
190    etap:diag("Updating " ++ integer_to_list(Count) ++ " new documents"),
191    DocList0 = lists:map(
192        fun(I) ->
193            {I rem num_set_partitions(), {[
194                {<<"meta">>, {[{<<"id">>, doc_id(I)}]}},
195                {<<"json">>, {[
196                    {<<"value">>, ValueGenFun(I)}
197                ]}}
198            ]}}
199        end,
200        lists:seq(StartId, StartId + Count - 1)),
201    DocList = [Doc || {_, Doc} <- lists:keysort(1, DocList0)],
202    ok = couch_set_view_test_util:populate_set_sequentially(
203        test_set_name(),
204        lists:seq(0, num_set_partitions() - 1),
205        DocList).
206
207
208doc_id(I) ->
209    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
210
211
212wait_updater_finished() ->
213    GroupPid = couch_set_view:get_group_pid(
214        mapreduce_view, test_set_name(), ddoc_id(), prod),
215    {ok, UpPid} = gen_server:call(GroupPid, updater_pid),
216    case is_pid(UpPid) of
217    true ->
218        Ref = erlang:monitor(process, UpPid),
219        receive
220        {'DOWN', Ref, process, UpPid, _} ->
221            ok
222        after 320000 ->
223            etap:diag("Timeout waiting for updater to finish")
224        end;
225    false ->
226        ok
227    end.
228