1#!/usr/bin/env escript
2%% -*- Mode: Erlang; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17-include_lib("couch_set_view/include/couch_set_view.hrl").
18
19
20test_set_name() -> <<"couch_test_ryow_query">>.
21num_set_partitions() -> 64.
22ddoc_id() -> <<"_design/test">>.
23num_docs() -> 5000.
24
25-define(TIMEOUT, 600000).
26
27main(_) ->
28    test_util:init_code_path(),
29
30    etap:plan(1),
31    case (catch test()) of
32        ok ->
33            etap:end_tests();
34        Other ->
35            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
36            etap:bail(Other)
37    end,
38    ok.
39
40test() ->
41    etap:diag("Testing stale=false query for RYOW property"),
42    couch_set_view_test_util:start_server(test_set_name()),
43    etap:diag("Adding documents with value = 0"),
44    create_set(),
45    GroupPid = couch_set_view:get_group_pid(
46        mapreduce_view, test_set_name(), ddoc_id(), prod),
47    add_documents(0, num_docs()),
48    couch_set_view_test_util:query_view(
49        test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
50
51    etap:diag("Updating documents with value = 1"),
52    update_documents(0, num_docs(), fun(_I) -> 1 end),
53
54    {ok, UpdaterPid} = gen_server:call(GroupPid, {start_updater, [pause]}, ?TIMEOUT),
55    Parent = self(),
56    QueryPid = spawn(fun() ->
57        setup_query_env(),
58        {ok, {ViewResults}} = couch_set_view_test_util:query_view(
59            test_set_name(), ddoc_id(), <<"test">>, ["stale=false"]),
60        Parent ! {query_response, self(), ViewResults}
61        end),
62
63
64    etap:diag("Updating same documents subsequently multiple times"),
65    update_documents(0, num_docs(), fun(_I) -> 2 end),
66    update_documents(0, num_docs(), fun(_I) -> 3 end),
67    update_documents(0, num_docs(), fun(_I) -> 4 end),
68
69
70    % Atleast two snapshots are required for updater to do checkpointing
71    couch_dcp_fake_server:set_items_per_snapshot(2),
72
73    case UpdaterPid of
74    nil ->
75        ok;
76    _ ->
77        UpdaterPid ! continue
78    end,
79
80    receive
81    {query_response, QueryPid, ViewResults} ->
82        verify_view_results(ViewResults)
83    after ?TIMEOUT ->
84        etap:bail("Timed out waiting for stale=false query results")
85    end,
86
87    shutdown_group(),
88    couch_set_view_test_util:stop_server(),
89    ok.
90
91
92setup_query_env() ->
93    case misc:is_ipv6() of
94        false ->
95            put(addr, couch_config:get("httpd", "ip4_bind_address", "127.0.0.1"));
96        true ->
97            Ip6Addr = couch_config:get("httpd", "ip6_bind_address", "::1"),
98            put(addr, "[" ++ Ip6Addr ++ "]")
99    end,
100    put(port, integer_to_list(mochiweb_socket_server:get(couch_httpd, port))),
101    ok.
102
103
104verify_view_results(ViewResults) ->
105    Rows = couch_util:get_value(<<"rows">>, ViewResults),
106    FoundOldValue = lists:any(fun({Row}) ->
107        Val = couch_util:get_value(<<"value">>, Row),
108        Val =:= 0
109    end, Rows),
110    etap:is(FoundOldValue, false,
111        "Expected each row value in stale=false query to be 1,2,3 or 4").
112
113
114create_set() ->
115    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
116    couch_set_view_test_util:create_set_dbs(test_set_name(), num_set_partitions()),
117    couch_set_view:cleanup_index_files(mapreduce_view, test_set_name()),
118    etap:diag("Creating the set databases (# of partitions: " ++
119        integer_to_list(num_set_partitions()) ++ ")"),
120    DDoc = {[
121        {<<"meta">>, {[{<<"id">>, ddoc_id()}]}},
122        {<<"json">>, {[
123            {<<"views">>, {[
124                {<<"test">>, {[
125                    {<<"map">>, <<"function(doc, meta) { emit(meta.id, doc.value); }">>}
126                ]}}
127            ]}}
128        ]}}
129    ]},
130    ok = couch_set_view_test_util:update_ddoc(test_set_name(), DDoc),
131    etap:diag(io_lib:format(
132        "Configuring set view with partitions [0 .. ~p] as active",
133        [num_set_partitions() - 1])),
134    Params = #set_view_params{
135        max_partitions = num_set_partitions(),
136        active_partitions = lists:seq(0, num_set_partitions() - 1),
137        passive_partitions = [],
138        use_replica_index = false
139    },
140    ok = couch_set_view:define_group(
141        mapreduce_view, test_set_name(), ddoc_id(), Params).
142
143
144add_documents(StartId, Count) ->
145    add_documents(StartId, Count, fun(_I) -> 0 end).
146
147add_documents(StartId, Count, ValueGenFun) ->
148    etap:diag("Adding " ++ integer_to_list(Count) ++ " new documents"),
149    DocList0 = lists:map(
150        fun(I) ->
151            {I rem num_set_partitions(), {[
152                {<<"meta">>, {[{<<"id">>, doc_id(I)}]}},
153                {<<"json">>, {[
154                    {<<"value">>, ValueGenFun(I)}
155                ]}}
156            ]}}
157        end,
158        lists:seq(StartId, StartId + Count - 1)),
159    DocList = [Doc || {_, Doc} <- lists:keysort(1, DocList0)],
160    ok = couch_set_view_test_util:populate_set_sequentially(
161        test_set_name(),
162        lists:seq(0, num_set_partitions() - 1),
163        DocList).
164
165
166update_documents(StartId, NumDocs, ValueGenFun) ->
167    etap:diag("About to update " ++ integer_to_list(NumDocs) ++ " documents"),
168    Dbs = dict:from_list(lists:map(
169        fun(I) ->
170            {ok, Db} = couch_set_view_test_util:open_set_db(test_set_name(), I),
171            {I, Db}
172        end,
173        lists:seq(0, num_set_partitions() - 1))),
174    Docs = lists:foldl(
175        fun(I, Acc) ->
176            Doc = couch_doc:from_json_obj({[
177                {<<"meta">>, {[{<<"id">>, doc_id(I)}]}},
178                {<<"json">>, {[
179                    {<<"value">>, ValueGenFun(I)}
180                ]}}
181            ]}),
182            DocList = case orddict:find(I rem num_set_partitions(), Acc) of
183            {ok, L} ->
184                L;
185            error ->
186                []
187            end,
188            orddict:store(I rem num_set_partitions(), [Doc | DocList], Acc)
189        end,
190        orddict:new(), lists:seq(StartId, StartId + NumDocs - 1)),
191    [] = orddict:fold(
192        fun(I, DocList, Acc) ->
193            Db = dict:fetch(I, Dbs),
194            ok = couch_db:update_docs(Db, DocList, [sort_docs]),
195            Acc
196        end,
197        [], Docs),
198    etap:diag("Updated " ++ integer_to_list(NumDocs) ++ " documents"),
199    ok = lists:foreach(fun({_, Db}) -> ok = couch_db:close(Db) end, dict:to_list(Dbs)).
200
201
202doc_id(I) ->
203    iolist_to_binary(io_lib:format("doc_~8..0b", [I])).
204
205
206shutdown_group() ->
207    couch_dcp_fake_server:reset(),
208    GroupPid = couch_set_view:get_group_pid(
209        mapreduce_view, test_set_name(), ddoc_id(), prod),
210    couch_set_view_test_util:delete_set_dbs(test_set_name(), num_set_partitions()),
211    MonRef = erlang:monitor(process, GroupPid),
212    receive
213    {'DOWN', MonRef, _, _, _} ->
214        ok
215    after 10000 ->
216        etap:bail("Timeout waiting for group shutdown")
217    end.
218