xref: /5.5.2/couchdb/test/etap/073-changes.t (revision 8be914dd)
1#!/usr/bin/env escript
2%% -*- erlang -*-
3%%! -smp enable
4
5% Licensed under the Apache License, Version 2.0 (the "License"); you may not
6% use this file except in compliance with the License. You may obtain a copy of
7% the License at
8%
9%   http://www.apache.org/licenses/LICENSE-2.0
10%
11% Unless required by applicable law or agreed to in writing, software
12% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14% License for the specific language governing permissions and limitations under
15% the License.
16
17% Verify that compacting databases that are being used as the source or
18% target of a replication doesn't affect the replication and that the
19% replication doesn't hold their reference counters forever.
20
21-record(user_ctx, {
22    name = null,
23    roles = [],
24    handler
25}).
26
27-record(changes_args, {
28    feed = "normal",
29    dir = fwd,
30    since = 0,
31    limit = 1000000000000000,
32    style = main_only,
33    heartbeat,
34    timeout,
35    filter = "",
36    filter_fun,
37    filter_args = [],
38    include_docs = false,
39    conflicts = false,
40    db_open_options = []
41}).
42
43-record(row, {
44    id,
45    seq,
46    deleted = false
47}).
48
49
50test_db_name() -> <<"couch_test_changes">>.
51
52
53main(_) ->
54    test_util:init_code_path(),
55
56    etap:plan(39),
57    case (catch test()) of
58        ok ->
59            etap:end_tests();
60        Other ->
61            etap:diag(io_lib:format("Test died abnormally: ~p", [Other])),
62            etap:bail(Other)
63    end,
64    ok.
65
66
67test() ->
68    couch_server_sup:start_link(test_util:config_files()),
69
70    test_by_doc_ids(),
71    test_by_doc_ids_with_since(),
72    test_by_doc_ids_continuous(),
73    test_design_docs_only(),
74
75    couch_server_sup:stop(),
76    ok.
77
78
79test_by_doc_ids() ->
80    {ok, Db} = create_db(test_db_name()),
81
82    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc1">>}]}),
83    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc2">>}]}),
84    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc3">>}]}),
85    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc4">>}]}),
86    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc5">>}]}),
87    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc3">>}]}),
88    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc6">>}]}),
89    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc7">>}]}),
90    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc8">>}]}),
91
92    etap:diag("Folding changes in ascending order with _doc_ids filter"),
93    ChangesArgs = #changes_args{
94        filter = "_doc_ids"
95    },
96    DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
97    Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
98    Consumer = spawn_consumer(test_db_name(), ChangesArgs, Req),
99
100    {Rows, LastSeq} = wait_finished(Consumer),
101    {ok, Db2} = couch_db:open_int(test_db_name(), []),
102    UpSeq = couch_db:get_update_seq(Db2),
103    couch_db:close(Db2),
104    etap:is(length(Rows), 2, "Received 2 changes rows"),
105    etap:is(LastSeq, UpSeq, "LastSeq is same as database update seq number"),
106    [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows,
107    etap:is(Id1, <<"doc4">>, "First row is for doc doc4"),
108    etap:is(Seq1, 4, "First row has seq 4"),
109    etap:is(Id2, <<"doc3">>, "Second row is for doc doc3"),
110    etap:is(Seq2, 6, "Second row has seq 6"),
111
112    stop(Consumer),
113    etap:diag("Folding changes in descending order with _doc_ids filter"),
114    ChangesArgs2 = #changes_args{
115        filter = "_doc_ids",
116        dir = rev
117    },
118    Consumer2 = spawn_consumer(test_db_name(), ChangesArgs2, Req),
119
120    {Rows2, LastSeq2} = wait_finished(Consumer2),
121    etap:is(length(Rows2), 2, "Received 2 changes rows"),
122    etap:is(LastSeq2, 4, "LastSeq is 4"),
123    [#row{seq = Seq1_2, id = Id1_2}, #row{seq = Seq2_2, id = Id2_2}] = Rows2,
124    etap:is(Id1_2, <<"doc3">>, "First row is for doc doc3"),
125    etap:is(Seq1_2, 6, "First row has seq 4"),
126    etap:is(Id2_2, <<"doc4">>, "Second row is for doc doc4"),
127    etap:is(Seq2_2, 4, "Second row has seq 6"),
128
129    stop(Consumer2),
130    delete_db(Db).
131
132
133test_by_doc_ids_with_since() ->
134    {ok, Db} = create_db(test_db_name()),
135
136    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc1">>}]}),
137    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc2">>}]}),
138    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc3">>}]}),
139    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc4">>}]}),
140    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc5">>}]}),
141    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc3">>}]}),
142    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc6">>}]}),
143    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc7">>}]}),
144    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc8">>}]}),
145
146    ChangesArgs = #changes_args{
147        filter = "_doc_ids",
148        since = 5
149    },
150    DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
151    Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
152    Consumer = spawn_consumer(test_db_name(), ChangesArgs, Req),
153
154    {Rows, LastSeq} = wait_finished(Consumer),
155    {ok, Db2} = couch_db:open_int(test_db_name(), []),
156    UpSeq = couch_db:get_update_seq(Db2),
157    couch_db:close(Db2),
158    etap:is(LastSeq, UpSeq, "LastSeq is same as database update seq number"),
159    etap:is(length(Rows), 1, "Received 1 changes rows"),
160    [#row{seq = Seq1, id = Id1}] = Rows,
161    etap:is(Id1, <<"doc3">>, "First row is for doc doc3"),
162    etap:is(Seq1, 6, "First row has seq 6"),
163
164    stop(Consumer),
165
166    ChangesArgs2 = #changes_args{
167        filter = "_doc_ids",
168        since = 6
169    },
170    Consumer2 = spawn_consumer(test_db_name(), ChangesArgs2, Req),
171
172    {Rows2, LastSeq2} = wait_finished(Consumer2),
173    {ok, Db3} = couch_db:open_int(test_db_name(), []),
174    UpSeq2 = couch_db:get_update_seq(Db3),
175    couch_db:close(Db3),
176    etap:is(LastSeq2, UpSeq2, "LastSeq is same as database update seq number"),
177    etap:is(length(Rows2), 0, "Received 0 change rows"),
178
179    stop(Consumer2),
180
181    ok = save_doc_meta(
182        Db,
183        {[{<<"id">>, <<"doc3">>}, {<<"deleted">>, true}]}),
184
185    ChangesArgs3 = #changes_args{
186        filter = "_doc_ids",
187        since = 9
188    },
189    Consumer3 = spawn_consumer(test_db_name(), ChangesArgs3, Req),
190
191    {Rows3, LastSeq3} = wait_finished(Consumer3),
192    {ok, Db4} = couch_db:open_int(test_db_name(), []),
193    UpSeq3 = couch_db:get_update_seq(Db4),
194    couch_db:close(Db4),
195    etap:is(LastSeq3, UpSeq3, "LastSeq is same as database update seq number"),
196    etap:is(length(Rows3), 1, "Received 1 changes rows"),
197    etap:is(
198        [#row{seq = LastSeq3, id = <<"doc3">>, deleted = true}],
199        Rows3,
200        "Received row with doc3 deleted"),
201
202    stop(Consumer3),
203
204    delete_db(Db).
205
206
207test_by_doc_ids_continuous() ->
208    {ok, Db} = create_db(test_db_name()),
209
210    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc1">>}]}),
211    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc2">>}]}),
212    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc3">>}]}),
213    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc4">>}]}),
214    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc5">>}]}),
215    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc3">>}]}),
216    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc6">>}]}),
217    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc7">>}]}),
218    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc8">>}]}),
219
220    ChangesArgs = #changes_args{
221        filter = "_doc_ids",
222        feed = "continuous"
223    },
224    DocIds = [<<"doc3">>, <<"doc4">>, <<"doc9999">>],
225    Req = {json_req, {[{<<"doc_ids">>, DocIds}]}},
226    Consumer = spawn_consumer(test_db_name(), ChangesArgs, Req),
227
228    pause(Consumer),
229    Rows = get_rows(Consumer),
230
231    etap:is(length(Rows), 2, "Received 2 changes rows"),
232    [#row{seq = Seq1, id = Id1}, #row{seq = Seq2, id = Id2}] = Rows,
233    etap:is(Id1, <<"doc4">>, "First row is for doc doc4"),
234    etap:is(Seq1, 4, "First row has seq 4"),
235    etap:is(Id2, <<"doc3">>, "Second row is for doc doc3"),
236    etap:is(Seq2, 6, "Second row has seq 6"),
237
238    clear_rows(Consumer),
239    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc9">>}]}),
240    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc10">>}]}),
241    unpause(Consumer),
242    pause(Consumer),
243    etap:is(get_rows(Consumer), [], "No new rows"),
244
245    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc4">>}]}),
246    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc11">>}]}),
247    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc4">>}]}),
248    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc12">>}]}),
249    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc3">>}]}),
250    unpause(Consumer),
251    pause(Consumer),
252
253    NewRows = get_rows(Consumer),
254    etap:is(length(NewRows), 2, "Received 2 new rows"),
255    [Row14, Row16] = NewRows,
256    etap:is(Row14#row.seq, 14, "First row has seq 14"),
257    etap:is(Row14#row.id, <<"doc4">>, "First row is for doc doc4"),
258    etap:is(Row16#row.seq, 16, "Second row has seq 16"),
259    etap:is(Row16#row.id, <<"doc3">>, "Second row is for doc doc3"),
260
261    clear_rows(Consumer),
262    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc3">>}]}),
263    unpause(Consumer),
264    pause(Consumer),
265    etap:is(get_rows(Consumer), [#row{seq = 17, id = <<"doc3">>}],
266        "Got row for seq 17, doc doc3"),
267
268    unpause(Consumer),
269    stop(Consumer),
270    delete_db(Db).
271
272
273test_design_docs_only() ->
274    {ok, Db} = create_db(test_db_name()),
275
276    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc1">>}]}),
277    ok = save_doc_meta(Db, {[{<<"id">>, <<"doc2">>}]}),
278    ok = save_doc_meta(Db, {[{<<"id">>, <<"_design/foo">>}]}),
279
280    ChangesArgs = #changes_args{
281        filter = "_design"
282    },
283    Consumer = spawn_consumer(test_db_name(), ChangesArgs, {json, null}),
284
285    {Rows, LastSeq} = wait_finished(Consumer),
286    {ok, Db2} = couch_db:open_int(test_db_name(), []),
287    UpSeq = couch_db:get_update_seq(Db2),
288    couch_db:close(Db2),
289
290    etap:is(LastSeq, UpSeq, "LastSeq is same as database update seq number"),
291    etap:is(length(Rows), 1, "Received 1 changes rows"),
292    etap:is(Rows, [#row{seq = 3, id = <<"_design/foo">>}], "Received row with ddoc"),
293
294    stop(Consumer),
295
296    {ok, Db3} = couch_db:open_int(
297        test_db_name(), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]),
298    ok = save_doc_meta(
299        Db3,
300        {[{<<"id">>, <<"_design/foo">>},
301            {<<"deleted">>, true}]}),
302
303    Consumer2 = spawn_consumer(test_db_name(), ChangesArgs, {json, null}),
304
305    {Rows2, LastSeq2} = wait_finished(Consumer2),
306    UpSeq2 = UpSeq + 1,
307    couch_db:close(Db3),
308
309    etap:is(LastSeq2, UpSeq2, "LastSeq is same as database update seq number"),
310    etap:is(length(Rows2), 1, "Received 1 changes rows"),
311    etap:is(
312        Rows2,
313        [#row{seq = 4, id = <<"_design/foo">>, deleted = true}],
314        "Received row with deleted ddoc"),
315
316    stop(Consumer2),
317    delete_db(Db).
318
319
320save_doc_meta(Db, JsonMeta) ->
321    Doc = couch_doc:from_json_obj({[{<<"meta">>, JsonMeta}]}),
322    couch_db:update_doc(Db, Doc, []).
323
324
325get_rows(Consumer) ->
326    Ref = make_ref(),
327    Consumer ! {get_rows, Ref},
328    receive
329    {rows, Ref, Rows} ->
330        Rows
331    after 3000 ->
332        etap:bail("Timeout getting rows from consumer")
333    end.
334
335
336clear_rows(Consumer) ->
337    Ref = make_ref(),
338    Consumer ! {reset, Ref},
339    receive
340    {ok, Ref} ->
341        ok
342    after 3000 ->
343        etap:bail("Timeout clearing consumer rows")
344    end.
345
346
347stop(Consumer) ->
348    Ref = make_ref(),
349    Consumer ! {stop, Ref},
350    receive
351    {ok, Ref} ->
352        ok
353    after 3000 ->
354        etap:bail("Timeout stopping consumer")
355    end.
356
357
358pause(Consumer) ->
359    Ref = make_ref(),
360    Consumer ! {pause, Ref},
361    receive
362    {paused, Ref} ->
363        ok
364    after 3000 ->
365        etap:bail("Timeout pausing consumer")
366    end.
367
368
369unpause(Consumer) ->
370    Ref = make_ref(),
371    Consumer ! {continue, Ref},
372    receive
373    {ok, Ref} ->
374        ok
375    after 3000 ->
376        etap:bail("Timeout unpausing consumer")
377    end.
378
379
380wait_finished(_Consumer) ->
381    receive
382    {consumer_finished, Rows, LastSeq} ->
383        {Rows, LastSeq}
384    after 30000 ->
385        etap:bail("Timeout waiting for consumer to finish")
386    end.
387
388
389spawn_consumer(DbName, ChangesArgs0, Req) ->
390    Parent = self(),
391    spawn(fun() ->
392        Callback = fun({change, {Change}, _}, _, Acc) ->
393            Id = couch_util:get_value(<<"id">>, Change),
394            Seq = couch_util:get_value(<<"seq">>, Change),
395            Del = couch_util:get_value(<<"deleted">>, Change, false),
396            [#row{id = Id, seq = Seq, deleted = Del} | Acc];
397        ({stop, LastSeq}, _, Acc) ->
398            Parent ! {consumer_finished, lists:reverse(Acc), LastSeq},
399            stop_loop(Parent, Acc);
400        (_, _, Acc) ->
401            maybe_pause(Parent, Acc)
402        end,
403        {ok, Db} = couch_db:open_int(DbName, []),
404        ChangesArgs = ChangesArgs0#changes_args{timeout = 10, heartbeat = 10},
405        FeedFun = couch_changes:handle_changes(ChangesArgs, Req, Db),
406        try
407            FeedFun({Callback, []})
408        catch throw:{stop, _} ->
409            ok
410        end,
411        catch couch_db:close(Db)
412    end).
413
414
415maybe_pause(Parent, Acc) ->
416    receive
417    {get_rows, Ref} ->
418        Parent ! {rows, Ref, lists:reverse(Acc)},
419        maybe_pause(Parent, Acc);
420    {reset, Ref} ->
421        Parent ! {ok, Ref},
422        maybe_pause(Parent, []);
423    {pause, Ref} ->
424        Parent ! {paused, Ref},
425        pause_loop(Parent, Acc);
426    {stop, Ref} ->
427        Parent ! {ok, Ref},
428        throw({stop, Acc})
429    after 0 ->
430        Acc
431    end.
432
433
434pause_loop(Parent, Acc) ->
435    receive
436    {stop, Ref} ->
437        Parent ! {ok, Ref},
438        throw({stop, Acc});
439    {reset, Ref} ->
440        Parent ! {ok, Ref},
441        pause_loop(Parent, []);
442    {continue, Ref} ->
443        Parent ! {ok, Ref},
444        Acc;
445    {get_rows, Ref} ->
446        Parent ! {rows, Ref, lists:reverse(Acc)},
447        pause_loop(Parent, Acc)
448    end.
449
450
451stop_loop(Parent, Acc) ->
452    receive
453    {get_rows, Ref} ->
454        Parent ! {rows, Ref, lists:reverse(Acc)},
455        stop_loop(Parent, Acc);
456    {stop, Ref} ->
457        Parent ! {ok, Ref},
458        Acc
459    end.
460
461
462create_db(DbName) ->
463    couch_db:create(
464        DbName,
465        [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}, overwrite]).
466
467
468delete_db(Db) ->
469    ok = couch_server:delete(
470        couch_db:name(Db), [{user_ctx, #user_ctx{roles = [<<"_admin">>]}}]).
471