1#!/usr/bin/python
2
3try: import simplejson as json
4except ImportError: import json
5import couchdb
6import httplib
7import urllib
8import common
9import unittest
10import time
11
12
13HOST = "localhost:5984"
14SET_NAME = "test_suite_set_view"
15NUM_PARTS = 8
16NUM_DOCS = 400000
17DDOC = {
18    "_id": "_design/test",
19    "language": "javascript",
20    "views": {
21        "mapview": {
22            "map": "function(doc) { emit(doc.integer, doc.string); }"
23        },
24        "redview": {
25            "map": "function(doc) { emit(doc.integer, doc.string); }",
26            "reduce": "_count"
27        }
28    }
29}
30
31
32class TestReplicaIndex(unittest.TestCase):
33
34    def setUp(self):
35        self._params = {
36            "host": HOST,
37            "ddoc": DDOC,
38            "nparts": NUM_PARTS,
39            "ndocs": NUM_DOCS,
40            "setname": SET_NAME,
41            "server": couchdb.Server(url = "http://" + HOST)
42            }
43        # print "Creating databases"
44        common.create_dbs(self._params)
45        common.populate(self._params)
46        # print "Databases created"
47        # print "Configuring set view with:"
48        # print "\tmaximum of 8 partitions"
49        # print "\tactive partitions = [0, 1, 2, 3]"
50        # print "\tpassive partitions = []"
51        # print "\treplica index = True"
52        active_parts = [0, 1, 2, 3]
53        common.define_set_view(self._params, active_parts, [], True)
54
55
56    def tearDown(self):
57        # print "Deleting test data"
58        common.create_dbs(self._params, True)
59
60
61    def test_replica_index(self):
62        # print "Querying map view in steady state"
63        (resp, view_result) = common.query(self._params, "mapview")
64
65        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
66
67        self.assertEqual(view_result["total_rows"], expected_row_count,
68                         "Query returned %d total_rows" % expected_row_count)
69        self.assertEqual(len(view_result["rows"]), expected_row_count,
70                         "Query returned %d rows" % expected_row_count)
71        common.test_keys_sorted(view_result)
72
73        # print "verifying group info"
74        info = common.get_set_view_info(self._params)
75        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
76        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
77        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
78        self.assertEqual(info["replica_partitions"], [],
79                         "main group has [] as replica partitions")
80        self.assertEqual(info["replicas_on_transfer"], [],
81                         "main group has [] as replicas on transfer")
82        self.assertEqual(sorted(info["update_seqs"].keys()), ["0", "1", "2", "3"],
83                         "right keys in update_seqs of the main index")
84        for i in [0, 1, 2, 3]:
85            expected_seq = common.partition_update_seq(self._params, i)
86            self.assertEqual(info["update_seqs"][str(i)], expected_seq,
87                             "right update seq number (%d) for partition %d" % (expected_seq, i + 1))
88        self.assertEqual(type(info["replica_group_info"]), dict, "group has replica_group_info")
89        self.assertEqual(info["replica_group_info"]["max_number_partitions"], 8,
90                          "replica group defined with maximum of 8 partitions")
91        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
92                          "replica group has no active partitions")
93        self.assertEqual(info["replica_group_info"]["passive_partitions"], [],
94                          "replica group has no passive partitions")
95        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
96                          "replica group has no cleanup partitions")
97        self.assertEqual(info["replica_group_info"]["update_seqs"].keys(), [],
98                          "replica group update_seqs is empty")
99        self.assertEqual(info["replica_group_info"]["stats"]["partial_updates"], 0,
100                          "replica group has 0 partial updates")
101        self.assertEqual(info["replica_group_info"]["stats"]["full_updates"], 0,
102                          "replica group has 0 full updates")
103
104        # print "defining partitions [4, 5, 6, 7] as replicas"
105        common.add_replica_partitions(self._params, [4, 5, 6, 7])
106
107        # print "verifying group info"
108        info = common.get_set_view_info(self._params)
109        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
110        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
111        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
112        self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
113                         "main group has [4, 5, 6, 7] as replica partitions")
114        self.assertEqual(info["replicas_on_transfer"], [],
115                         "main group has [] as replicas on transfer")
116        self.assertEqual(info["replica_group_info"]["passive_partitions"], [4, 5, 6, 7],
117                          "replica group has [4, 5, 6, 7] as passive partitions")
118        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
119                          "replica group has no active partitions")
120        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
121                          "replica group has no cleanup partitions")
122
123        # print "waiting for replica index to update"
124        seconds = 0
125        while info["replica_group_info"]["stats"]["full_updates"] < 1:
126            time.sleep(5)
127            seconds += 5
128            if seconds > 900:
129                raise(Exception("timeout waiting for replica index full update"))
130            info = common.get_set_view_info(self._params)
131
132        self.assertEqual(sorted(info["replica_group_info"]["update_seqs"].keys()), ["4", "5", "6", "7"],
133                          "replica group update_seqs has keys [4, 5, 6, 7]")
134        for i in [4, 5, 6, 7]:
135            expected_seq = common.partition_update_seq(self._params, i)
136            self.assertEqual(info["replica_group_info"]["update_seqs"][str(i)], expected_seq,
137                             "right update seq number (%d) for replica partition %d" % (expected_seq, i + 1))
138
139        # print "Querying map view after marking partitions [4, 5, 6, 7] as replicas (not yet active)"
140        (resp, view_result) = common.query(self._params, "mapview")
141
142        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
143        self.assertEqual(len(view_result["rows"]), expected_row_count,
144                         "Query returned %d rows" % expected_row_count)
145        common.test_keys_sorted(view_result)
146
147        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
148        all_keys = {}
149        for row in view_result["rows"]:
150            all_keys[row["key"]] = row["value"]
151        for key in xrange(5, doc_count, self._params["nparts"]):
152            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
153        for key in xrange(6, doc_count, self._params["nparts"]):
154            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
155        for key in xrange(7, doc_count, self._params["nparts"]):
156            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
157        for key in xrange(8, doc_count, self._params["nparts"]):
158            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
159
160        # print "Querying reduce view, with ?group=false, after marking partitions [4, 5, 6, 7] as replicas (not yet active)"
161        (resp, red_view_result_nogroup) = common.query(self._params, "redview")
162
163        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3])
164        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
165        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
166        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
167                         "Reduce value is %d with ?group=false" % expected_reduce_value)
168
169        # print "Querying reduce view, with ?group=true, after marking partitions [4, 5, 6, 7] as replicas (not yet active)"
170        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true"})
171
172        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
173        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
174                         "Query returned %d rows" % expected_row_count)
175        common.test_keys_sorted(red_view_result_group)
176
177        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
178        all_keys = {}
179        for row in red_view_result_group["rows"]:
180            all_keys[row["key"]] = row["value"]
181            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
182        for key in xrange(5, doc_count, self._params["nparts"]):
183            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
184        for key in xrange(6, doc_count, self._params["nparts"]):
185            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
186        for key in xrange(7, doc_count, self._params["nparts"]):
187            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
188        for key in xrange(8, doc_count, self._params["nparts"]):
189            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
190
191        new_info = common.get_set_view_info(self._params)
192        common.restart_server(self._params)
193
194        # print "Querying map view again after immediately after restarting the server"
195        (resp, view_result_after) = common.query(self._params, "mapview")
196        self.assertEqual(view_result_after["rows"], view_result["rows"],
197                         "Same view rows after server restart")
198
199        # print "Querying reduce view, with ?group=false, after restarting the server"
200        (resp, red_view_result_nogroup_after) = common.query(self._params, "redview")
201        self.assertEqual(red_view_result_nogroup_after["rows"],
202                         red_view_result_nogroup["rows"],
203                         "Same view result as before")
204
205        # print "Querying reduce view, with ?group=true, after restarting the server"
206        (resp, red_view_result_group_after) = common.query(self._params, "redview", {"group": "true"})
207        self.assertEqual(red_view_result_group_after["rows"],
208                         red_view_result_group["rows"],
209                         "Same view result as before")
210
211        # print "removing partitions [5, 7] from replica set"
212        common.remove_replica_partitions(self._params, [5, 7])
213
214        # print "verifying group info"
215        info = common.get_set_view_info(self._params)
216        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
217        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
218        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
219        self.assertEqual(info["replica_partitions"], [4, 6],
220                         "main group has [4, 6] as replica partitions")
221        self.assertEqual(info["replicas_on_transfer"], [],
222                         "main group has [] as replicas on transfer")
223        self.assertEqual(info["replica_group_info"]["passive_partitions"], [4, 6],
224                          "replica group has [4, 6] as passive partitions")
225        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
226                          "replica group has no active partitions")
227        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [5, 7],
228                          "replica group has [5, 7] as cleanup partitions")
229
230        # print "waiting for replica index to finish cleanup"
231        seconds = 0
232        while info["replica_group_info"]["stats"]["cleanups"] < 1:
233            time.sleep(5)
234            seconds += 5
235            if seconds > 900:
236                raise(Exception("timeout waiting for replica index cleanup"))
237            info = common.get_set_view_info(self._params)
238
239        self.assertEqual(info["replica_group_info"]["passive_partitions"], [4, 6],
240                          "replica group has [4, 6] as passive partitions")
241        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
242                          "replica group has no active partitions")
243        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
244                          "replica group has [] as cleanup partitions")
245        self.assertEqual(sorted(info["replica_group_info"]["update_seqs"].keys()), ["4", "6"],
246                          "replica group update_seqs has keys [4, 6")
247
248        new_info = common.get_set_view_info(self._params)
249        self.restart_compare_info(info, new_info)
250
251        info = common.get_set_view_info(self._params)
252        updates_before = info["replica_group_info"]["stats"]["full_updates"]
253
254        # print "defining partitions [5, 7] as replicas again"
255        common.add_replica_partitions(self._params, [5, 7])
256
257        # replica group likely has [5, 7] in a pending transition, just wait a while if so
258        self.wait_for_pending_transition_applied()
259
260        # print "verifying group info"
261        info = common.get_set_view_info(self._params)
262        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
263        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
264        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
265        self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
266                         "main group has [4, 5, 6, 7] as replica partitions")
267        self.assertEqual(info["replicas_on_transfer"], [],
268                         "main group has [] as replicas on transfer")
269        self.assertEqual(info["replica_group_info"]["passive_partitions"], [4, 5, 6, 7],
270                          "replica group has [4, 5, 6, 7] as passive partitions")
271        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
272                          "replica group has no active partitions")
273        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
274                          "replica group has no cleanup partitions")
275
276        # print "waiting for replica index to index partitions 5 and 7"
277        seconds = 0
278        while info["replica_group_info"]["stats"]["full_updates"] < (updates_before + 1):
279            time.sleep(5)
280            seconds += 5
281            if seconds > 900:
282                raise(Exception("timeout waiting for replica index full update"))
283            info = common.get_set_view_info(self._params)
284
285        self.assertEqual(sorted(info["replica_group_info"]["update_seqs"].keys()), ["4", "5", "6", "7"],
286                          "replica group update_seqs has keys [4, 5, 6, 7]")
287        for i in [4, 5, 6, 7]:
288            expected_seq = common.partition_update_seq(self._params, i)
289            self.assertEqual(info["replica_group_info"]["update_seqs"][str(i)], expected_seq,
290                             "right update seq number (%d) for replica partition %d" % (expected_seq, i + 1))
291
292        # print "Querying map view after marking partitions [5, 7] as replicas again (not yet active)"
293        (resp, view_result) = common.query(self._params, "mapview")
294
295        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
296        self.assertEqual(len(view_result["rows"]), expected_row_count,
297                         "Query returned %d rows" % expected_row_count)
298        common.test_keys_sorted(view_result)
299
300        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
301        all_keys = {}
302        for row in view_result["rows"]:
303            all_keys[row["key"]] = row["value"]
304        for key in xrange(5, doc_count, self._params["nparts"]):
305            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
306        for key in xrange(6, doc_count, self._params["nparts"]):
307            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
308        for key in xrange(7, doc_count, self._params["nparts"]):
309            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
310        for key in xrange(8, doc_count, self._params["nparts"]):
311            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
312
313        # print "Querying reduce view, with ?group=false, after marking partitions [5, 7] as replicas again (not yet active)"
314        (resp, red_view_result_nogroup) = common.query(self._params, "redview")
315
316        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3])
317        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
318        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
319        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
320                         "Reduce value is %d with ?group=false" % expected_reduce_value)
321
322        # print "Querying reduce view, with ?group=true, after marking partitions [5, 7] as replicas again (not yet active)"
323        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true"})
324
325        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
326        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
327                         "Query returned %d rows" % expected_row_count)
328        common.test_keys_sorted(red_view_result_group)
329
330        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
331        all_keys = {}
332        for row in red_view_result_group["rows"]:
333            all_keys[row["key"]] = row["value"]
334            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
335        for key in xrange(5, doc_count, self._params["nparts"]):
336            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
337        for key in xrange(6, doc_count, self._params["nparts"]):
338            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
339        for key in xrange(7, doc_count, self._params["nparts"]):
340            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
341        for key in xrange(8, doc_count, self._params["nparts"]):
342            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
343
344        # print "Marking partitions [4, 5, 7] as active (to be transferred from replica to main index)"
345        common.set_partition_states(self._params, active = [4, 5, 7])
346
347        info = common.get_set_view_info(self._params)
348        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
349        self.assertEqual(info["passive_partitions"], [4, 5, 7], "right passive partitions list")
350        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
351        self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
352                         "main group has [4, 5, 6, 7] as replica partitions")
353        self.assertEqual(info["replicas_on_transfer"], [4, 5, 7],
354                         "main group has [4, 5, 7] as replicas on transfer")
355        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
356                          "replica group has [6] as passive partitions")
357        self.assertEqual(info["replica_group_info"]["active_partitions"], [4, 5, 7],
358                          "replica group has [4, 5, 7] as active partitions")
359        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
360                          "replica group has no cleanup partitions")
361
362        # print "Querying map view with ?stale=update_after to trigger transferral of" + \
363        #     " replica partitions [4, 5, 7] from replica index to main index"
364
365        updates_before = info["stats"]["full_updates"]
366
367        (resp, view_result) = common.query(self._params, "mapview", {"stale": "update_after"})
368
369        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 7])
370        self.assertEqual(len(view_result["rows"]), expected_row_count,
371                         "Query returned %d rows" % expected_row_count)
372        common.test_keys_sorted(view_result)
373
374        all_keys = {}
375        for row in view_result["rows"]:
376            all_keys[row["key"]] = row["value"]
377        for key in xrange(7, common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7]), self._params["nparts"]):
378            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
379
380        # print "Querying reduce view, with ?group=false, after triggering transfer of replica partitions [4, 5, 7]"
381        (resp, red_view_result_nogroup) = common.query(self._params, "redview", {"stale": "ok"})
382
383        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 7])
384        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
385        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
386        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
387                         "Reduce value is %d with ?group=false" % expected_reduce_value)
388
389        # print "Querying reduce view, with ?group=true, after triggering transfer of replica partitions [4, 5, 7]"
390        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true", "stale": "ok"})
391
392        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 7])
393        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
394                         "Query returned %d rows" % expected_row_count)
395        common.test_keys_sorted(red_view_result_group)
396
397        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
398        all_keys = {}
399        for row in red_view_result_group["rows"]:
400            all_keys[row["key"]] = row["value"]
401            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
402        for key in xrange(7, doc_count, self._params["nparts"]):
403            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
404
405        # print "Waiting for transferral of replica partitions [4, 5, 7] into main index"
406        info = common.get_set_view_info(self._params)
407        seconds = 0
408        while info["stats"]["full_updates"] < (updates_before + 1):
409            time.sleep(5)
410            seconds += 5
411            if seconds > 900:
412                raise(Exception("timeout waiting for transfer of replica partitions [4, 5, 7]"))
413            info = common.get_set_view_info(self._params)
414
415        self.assertEqual(info["active_partitions"], [0, 1, 2, 3, 4, 5, 7], "right active partitions list")
416        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
417        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
418        self.assertEqual(info["replica_partitions"], [6], "main group has [6] as replica partitions")
419        self.assertEqual(info["replicas_on_transfer"], [], "main group has [] as replicas on transfer")
420        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
421                          "replica group has [6] as passive partitions")
422        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
423                          "replica group has [] as active partitions")
424        self.assertEqual(sorted(info["update_seqs"].keys()), ["0", "1", "2", "3", "4", "5", "7"],
425                         "Right list of update seqs for main group")
426        self.assertEqual(sorted(info["replica_group_info"]["update_seqs"].keys()), ["6"],
427                         "Right list of update seqs for replica group")
428        for i in [0, 1, 2, 3, 4, 5, 7]:
429            expected_seq = common.partition_update_seq(self._params, i)
430            self.assertEqual(info["update_seqs"][str(i)], expected_seq,
431                             "right update seq number (%d) for partition %d in main group" % (expected_seq, i + 1))
432        for i in [6]:
433            expected_seq = common.partition_update_seq(self._params, i)
434            self.assertEqual(info["replica_group_info"]["update_seqs"][str(i)], expected_seq,
435                             "right update seq number (%d) for partition %d in replica group" % (expected_seq, i + 1))
436
437        # print "Querying map view after replica partitions were transferred"
438        (resp, view_result_after) = common.query(self._params, "mapview")
439
440        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 7])
441        self.assertEqual(len(view_result_after["rows"]), expected_row_count,
442                         "Query returned %d rows" % expected_row_count)
443        self.assertEqual(len(view_result_after["rows"]), len(view_result["rows"]),
444                         "Same number of view rows after replica partitions were transferred")
445        for i in xrange(len(view_result_after["rows"])):
446            self.assertEqual(view_result_after["rows"][i], view_result["rows"][i],
447                             "Row %d has same content after replica partitions were transferred" % (i + 1))
448
449        # print "Querying reduce view, with ?group=false, after replica partitions were transferred"
450        (resp, red_view_result_nogroup) = common.query(self._params, "redview", {"stale": "ok"})
451
452        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 7])
453        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
454        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
455        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
456                         "Reduce value is %d with ?group=false" % expected_reduce_value)
457
458        # print "Querying reduce view, with ?group=true, after replica partitions were transferred"
459        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true", "stale": "ok"})
460
461        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 7])
462        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
463                         "Query returned %d rows" % expected_row_count)
464        common.test_keys_sorted(red_view_result_group)
465
466        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
467        all_keys = {}
468        for row in red_view_result_group["rows"]:
469            all_keys[row["key"]] = row["value"]
470            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
471        for key in xrange(7, doc_count, self._params["nparts"]):
472            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
473
474        # print "Marking partitions [4, 5, 7] for cleanup in the main group"
475        common.set_partition_states(self._params, cleanup = [4, 5, 7])
476
477        # print "Waiting for cleanup (compaction) to finish"
478        common.compact_set_view(self._params)
479
480        info = common.get_set_view_info(self._params)
481        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
482        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
483        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
484        self.assertEqual(info["replica_partitions"], [6], "main group has [6] as replica partitions")
485        self.assertEqual(info["replicas_on_transfer"], [], "main group has [] as replicas on transfer")
486        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
487                          "replica group has [6] as passive partitions")
488        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
489                          "replica group has [] active partitions")
490
491        # print "Querying map view after marking partitions [4, 5, 7] for cleanup in the main group"
492        (resp, view_result) = common.query(self._params, "mapview")
493
494        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
495        self.assertEqual(len(view_result["rows"]), expected_row_count,
496                         "Query returned %d rows" % expected_row_count)
497        common.test_keys_sorted(view_result)
498
499        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
500        all_keys = {}
501        for row in view_result["rows"]:
502            all_keys[row["key"]] = row["value"]
503        for key in xrange(5, doc_count, self._params["nparts"]):
504            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
505        for key in xrange(6, doc_count, self._params["nparts"]):
506            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
507        for key in xrange(7, doc_count, self._params["nparts"]):
508            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
509        for key in xrange(8, doc_count, self._params["nparts"]):
510            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
511
512        # print "Querying reduce view, with ?group=false, after marking partitions [4, 5, 7] for cleanup in the main group"
513        (resp, red_view_result_nogroup) = common.query(self._params, "redview", {"stale": "ok"})
514
515        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3])
516        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
517        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
518        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
519                         "Reduce value is %d with ?group=false" % expected_reduce_value)
520
521        # print "Querying reduce view, with ?group=true, after marking partitions [4, 5, 7] for cleanup in the main group"
522        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true", "stale": "ok"})
523
524        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
525        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
526                         "Query returned %d rows" % expected_row_count)
527        common.test_keys_sorted(red_view_result_group)
528
529        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
530        all_keys = {}
531        for row in red_view_result_group["rows"]:
532            all_keys[row["key"]] = row["value"]
533            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
534        for key in xrange(5, doc_count, self._params["nparts"]):
535            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
536        for key in xrange(6, doc_count, self._params["nparts"]):
537            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
538        for key in xrange(7, doc_count, self._params["nparts"]):
539            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
540        for key in xrange(8, doc_count, self._params["nparts"]):
541            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
542
543        # print "defining partitions [4, 5, 7] as replicas again"
544        common.add_replica_partitions(self._params, [4, 5, 7])
545
546        info = common.get_set_view_info(self._params)
547        self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
548                         "main group has [4, 5, 6, 7] as replica partitions")
549        self.assertEqual(info["replicas_on_transfer"], [],
550                         "main group has [] as replicas on transfer")
551        self.assertEqual(info["replica_group_info"]["passive_partitions"], [4, 5, 6, 7],
552                          "replica group has [4, 5, 6, 7] as passive partitions")
553        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
554                          "replica group has no active partitions")
555        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
556                          "replica group has no cleanup partitions")
557
558        # print "removing partitions [4, 5, 7] from replica set again"
559        common.remove_replica_partitions(self._params, [4, 5, 7])
560
561        info = common.get_set_view_info(self._params)
562        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
563        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
564        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
565        self.assertEqual(info["replica_partitions"], [6], "main group has [6] as replica partitions")
566        self.assertEqual(info["replicas_on_transfer"], [], "main group has [] as replicas on transfer")
567        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
568                          "replica group has [6] as passive partitions")
569        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
570                          "replica group has no active partitions")
571        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [4, 5, 7],
572                          "replica group has [4, 5, 7] as cleanup partitions")
573
574        # print "defining partitions [4, 5, 7] as replicas again"
575        common.add_replica_partitions(self._params, [4, 5, 7])
576        self.wait_for_pending_transition_applied()
577
578        info = common.get_set_view_info(self._params)
579        self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
580                         "main group has [4, 5, 6, 7] as replica partitions")
581        self.assertEqual(info["replicas_on_transfer"], [],
582                         "main group has [] as replicas on transfer")
583        self.assertEqual(info["replica_group_info"]["passive_partitions"], [4, 5, 6, 7],
584                          "replica group has [4, 5, 6, 7] as passive partitions")
585        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
586                          "replica group has no active partitions")
587        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
588                          "replica group has no cleanup partitions")
589
590        # print "Querying map view after marking partitions [4, 5, 7] as replicas (not active)"
591        (resp, view_result) = common.query(self._params, "mapview")
592
593        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
594        self.assertEqual(len(view_result["rows"]), expected_row_count,
595                         "Query returned %d rows" % expected_row_count)
596        common.test_keys_sorted(view_result)
597
598        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
599        all_keys = {}
600        for row in view_result["rows"]:
601            all_keys[row["key"]] = row["value"]
602        for key in xrange(1, doc_count, self._params["nparts"]):
603            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
604        for key in xrange(2, doc_count, self._params["nparts"]):
605            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
606        for key in xrange(3, doc_count, self._params["nparts"]):
607            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
608        for key in xrange(4, doc_count, self._params["nparts"]):
609            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
610        for key in xrange(5, doc_count, self._params["nparts"]):
611            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
612        for key in xrange(6, doc_count, self._params["nparts"]):
613            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
614        for key in xrange(7, doc_count, self._params["nparts"]):
615            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
616        for key in xrange(8, doc_count, self._params["nparts"]):
617            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
618
619        # print "Querying reduce view, with ?group=false, after marking partitions [4, 5, 7] as replicas (not active)"
620        (resp, red_view_result_nogroup) = common.query(self._params, "redview", {"stale": "ok"})
621
622        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3])
623        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
624        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
625        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
626                         "Reduce value is %d with ?group=false" % expected_reduce_value)
627
628        # print "Querying reduce view, with ?group=true, after marking partitions [4, 5, 7] as replicas (not active)"
629        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true", "stale": "ok"})
630
631        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
632        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
633                         "Query returned %d rows" % expected_row_count)
634        common.test_keys_sorted(red_view_result_group)
635
636        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
637        all_keys = {}
638        for row in red_view_result_group["rows"]:
639            all_keys[row["key"]] = row["value"]
640            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
641        for key in xrange(1, doc_count, self._params["nparts"]):
642            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
643        for key in xrange(2, doc_count, self._params["nparts"]):
644            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
645        for key in xrange(3, doc_count, self._params["nparts"]):
646            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
647        for key in xrange(4, doc_count, self._params["nparts"]):
648            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
649        for key in xrange(5, doc_count, self._params["nparts"]):
650            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
651        for key in xrange(6, doc_count, self._params["nparts"]):
652            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
653        for key in xrange(7, doc_count, self._params["nparts"]):
654            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
655        for key in xrange(8, doc_count, self._params["nparts"]):
656            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
657
658        # print "Marking partitions [4, 5, 7] as active (to be transferred from replica to main index)"
659        common.set_partition_states(self._params, active = [4, 5, 7])
660
661        info = common.get_set_view_info(self._params)
662        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
663        self.assertEqual(info["passive_partitions"], [4, 5, 7], "right passive partitions list")
664        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
665        self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
666                         "main group has [4, 5, 6, 7] as replica partitions")
667        self.assertEqual(info["replicas_on_transfer"], [4, 5, 7],
668                         "main group has [4, 5, 7] as replicas on transfer")
669        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
670                          "replica group has [6] as passive partitions")
671        self.assertEqual(info["replica_group_info"]["active_partitions"], [4, 5, 7],
672                          "replica group has [4, 5, 7] as active partitions")
673        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
674                          "replica group has no cleanup partitions")
675
676        # print "Querying view with ?stale=update_after to trigger transferral of" + \
677        #    " replica partitions [4, 5, 7] from replica index to main index"
678
679        updates_before = info["stats"]["full_updates"]
680
681        (resp, view_result) = common.query(self._params, "mapview", {"stale": "update_after", "limit": "1"})
682
683        # print "unmarking partitions [4, 5] as replicas while partitions [4, 5, 7] are being transferred from replica to main group"
684        common.remove_replica_partitions(self._params, [4, 5])
685
686        # print "Waiting for transferral of replica partitions [7] into main index"
687        info = common.get_set_view_info(self._params)
688        seconds = 0
689        while info["stats"]["full_updates"] < (updates_before + 1):
690            time.sleep(5)
691            seconds += 5
692            if seconds > 900:
693                raise(Exception("timeout waiting for transfer of replica partitions [7]"))
694            info = common.get_set_view_info(self._params)
695
696        self.assertEqual(info["active_partitions"], [0, 1, 2, 3, 7], "right active partitions list")
697        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
698        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
699        self.assertEqual(info["replica_partitions"], [6], "main group has [6] as replica partitions")
700        self.assertEqual(info["replicas_on_transfer"], [], "main group has [] as replicas on transfer")
701        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
702                          "replica group has [6] as passive partitions")
703        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
704                          "replica group has [] as active partitions")
705        self.assertEqual(sorted(info["update_seqs"].keys()), ["0", "1", "2", "3", "7"],
706                         "Right list of update seqs for main group")
707        self.assertEqual(sorted(info["replica_group_info"]["update_seqs"].keys()), ["6"],
708                         "Right list of update seqs for replica group")
709        for i in [0, 1, 2, 3, 7]:
710            expected_seq = common.partition_update_seq(self._params, i)
711            self.assertEqual(info["update_seqs"][str(i)], expected_seq,
712                             "right update seq number (%d) for partition %d in main group" % (expected_seq, i + 1))
713        for i in [6]:
714            expected_seq = common.partition_update_seq(self._params, i)
715            self.assertEqual(info["replica_group_info"]["update_seqs"][str(i)], expected_seq,
716                             "right update seq number (%d) for partition %d in replica group" % (expected_seq, i + 1))
717
718        # print "Querying map view after removing replica partitions [4, 5]"
719        (resp, view_result) = common.query(self._params, "mapview")
720
721        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3, 7])
722        self.assertEqual(len(view_result["rows"]), expected_row_count,
723                         "Query returned %d rows" % expected_row_count)
724        common.test_keys_sorted(view_result)
725
726        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
727        all_keys = {}
728        for row in view_result["rows"]:
729            all_keys[row["key"]] = row["value"]
730        for key in xrange(1, doc_count, self._params["nparts"]):
731            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
732        for key in xrange(2, doc_count, self._params["nparts"]):
733            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
734        for key in xrange(3, doc_count, self._params["nparts"]):
735            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
736        for key in xrange(4, doc_count, self._params["nparts"]):
737            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
738        for key in xrange(5, doc_count, self._params["nparts"]):
739            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
740        for key in xrange(6, doc_count, self._params["nparts"]):
741            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
742        for key in xrange(7, doc_count, self._params["nparts"]):
743            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
744        for key in xrange(8, doc_count, self._params["nparts"]):
745            self.assertTrue(key in all_keys, "Key from partition 7 in view result")
746
747        # print "Querying reduce view, with ?group=false, after removing replica partitions [4, 5]"
748        (resp, red_view_result_nogroup) = common.query(self._params, "redview", {"stale": "ok"})
749
750        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3, 7])
751        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
752        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
753        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
754                         "Reduce value is %d with ?group=false" % expected_reduce_value)
755
756        # print "Querying reduce view, with ?group=true, after removing replica partitions [4, 5]"
757        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true", "stale": "ok"})
758
759        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3, 7])
760        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
761                         "Query returned %d rows" % expected_row_count)
762        common.test_keys_sorted(red_view_result_group)
763
764        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
765        all_keys = {}
766        for row in red_view_result_group["rows"]:
767            all_keys[row["key"]] = row["value"]
768            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
769        for key in xrange(1, doc_count, self._params["nparts"]):
770            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
771        for key in xrange(2, doc_count, self._params["nparts"]):
772            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
773        for key in xrange(3, doc_count, self._params["nparts"]):
774            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
775        for key in xrange(4, doc_count, self._params["nparts"]):
776            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
777        for key in xrange(5, doc_count, self._params["nparts"]):
778            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
779        for key in xrange(6, doc_count, self._params["nparts"]):
780            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
781        for key in xrange(7, doc_count, self._params["nparts"]):
782            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
783        for key in xrange(8, doc_count, self._params["nparts"]):
784            self.assertTrue(key in all_keys, "Key from partition 7 in view result")
785
786        # print "Marking partitions [7] for cleanup in the main group"
787        common.set_partition_states(self._params, cleanup = [7])
788
789        # print "Waiting for cleanup (compaction) to finish"
790        common.compact_set_view(self._params)
791
792        info = common.get_set_view_info(self._params)
793        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
794        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
795        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
796        self.assertEqual(info["replica_partitions"], [6], "main group has [6] as replica partitions")
797        self.assertEqual(info["replicas_on_transfer"], [], "main group has [] as replicas on transfer")
798        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
799                          "replica group has [6] as passive partitions")
800        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
801                          "replica group has [] active partitions")
802
803        # print "Querying map view after removing replica partitions [7]"
804        (resp, view_result) = common.query(self._params, "mapview")
805
806        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
807        self.assertEqual(len(view_result["rows"]), expected_row_count,
808                         "Query returned %d rows" % expected_row_count)
809        common.test_keys_sorted(view_result)
810
811        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
812        all_keys = {}
813        for row in view_result["rows"]:
814            all_keys[row["key"]] = row["value"]
815        for key in xrange(1, doc_count, self._params["nparts"]):
816            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
817        for key in xrange(2, doc_count, self._params["nparts"]):
818            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
819        for key in xrange(3, doc_count, self._params["nparts"]):
820            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
821        for key in xrange(4, doc_count, self._params["nparts"]):
822            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
823        for key in xrange(5, doc_count, self._params["nparts"]):
824            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
825        for key in xrange(6, doc_count, self._params["nparts"]):
826            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
827        for key in xrange(7, doc_count, self._params["nparts"]):
828            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
829        for key in xrange(8, doc_count, self._params["nparts"]):
830            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
831
832        # print "Querying reduce view, with ?group=false, after removing replica partitions [7]"
833        (resp, red_view_result_nogroup) = common.query(self._params, "redview", {"stale": "ok"})
834
835        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3])
836        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
837        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
838        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
839                         "Reduce value is %d with ?group=false" % expected_reduce_value)
840
841        # print "Querying reduce view, with ?group=true, after removing replica partitions [7]"
842        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true", "stale": "ok"})
843
844        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
845        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
846                         "Query returned %d rows" % expected_row_count)
847        common.test_keys_sorted(red_view_result_group)
848
849        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
850        all_keys = {}
851        for row in red_view_result_group["rows"]:
852            all_keys[row["key"]] = row["value"]
853            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
854        for key in xrange(1, doc_count, self._params["nparts"]):
855            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
856        for key in xrange(2, doc_count, self._params["nparts"]):
857            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
858        for key in xrange(3, doc_count, self._params["nparts"]):
859            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
860        for key in xrange(4, doc_count, self._params["nparts"]):
861            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
862        for key in xrange(5, doc_count, self._params["nparts"]):
863            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
864        for key in xrange(6, doc_count, self._params["nparts"]):
865            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
866        for key in xrange(7, doc_count, self._params["nparts"]):
867            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
868        for key in xrange(8, doc_count, self._params["nparts"]):
869            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
870
871        # print "defining partitions [4, 5, 7] as replicas again"
872        common.add_replica_partitions(self._params, [4, 5, 7])
873
874        info = common.get_set_view_info(self._params)
875        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
876        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
877        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
878        self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
879                         "main group has [4, 5, 6, 7] as replica partitions")
880        self.assertEqual(info["replicas_on_transfer"], [],
881                         "main group has [] as replicas on transfer")
882        self.assertEqual(info["replica_group_info"]["passive_partitions"], [4, 5, 6, 7],
883                          "replica group has [4, 5, 6, 7] as passive partitions")
884        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
885                          "replica group has no active partitions")
886        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
887                          "replica group has no cleanup partitions")
888
889        # print "Querying map view after marking partitions [4, 5, 7] as replicas (not yet active)"
890        (resp, view_result) = common.query(self._params, "mapview")
891
892        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
893        self.assertEqual(len(view_result["rows"]), expected_row_count,
894                         "Query returned %d rows" % expected_row_count)
895        common.test_keys_sorted(view_result)
896
897        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
898        all_keys = {}
899        for row in view_result["rows"]:
900            all_keys[row["key"]] = row["value"]
901        for key in xrange(1, doc_count, self._params["nparts"]):
902            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
903        for key in xrange(2, doc_count, self._params["nparts"]):
904            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
905        for key in xrange(3, doc_count, self._params["nparts"]):
906            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
907        for key in xrange(4, doc_count, self._params["nparts"]):
908            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
909        for key in xrange(5, doc_count, self._params["nparts"]):
910            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
911        for key in xrange(6, doc_count, self._params["nparts"]):
912            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
913        for key in xrange(7, doc_count, self._params["nparts"]):
914            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
915        for key in xrange(8, doc_count, self._params["nparts"]):
916            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
917
918        # print "Querying reduce view, with ?group=false, after marking partitions [4, 5, 7] as replicas (not yet active)"
919        (resp, red_view_result_nogroup) = common.query(self._params, "redview", {"stale": "ok"})
920
921        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3])
922        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
923        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
924        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
925                         "Reduce value is %d with ?group=false" % expected_reduce_value)
926
927        # print "Querying reduce view, with ?group=true, after marking partitions [4, 5, 7] as replicas (not yet active)"
928        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true", "stale": "ok"})
929
930        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3])
931        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
932                         "Query returned %d rows" % expected_row_count)
933        common.test_keys_sorted(red_view_result_group)
934
935        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
936        all_keys = {}
937        for row in red_view_result_group["rows"]:
938            all_keys[row["key"]] = row["value"]
939            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
940        for key in xrange(1, doc_count, self._params["nparts"]):
941            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
942        for key in xrange(2, doc_count, self._params["nparts"]):
943            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
944        for key in xrange(3, doc_count, self._params["nparts"]):
945            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
946        for key in xrange(4, doc_count, self._params["nparts"]):
947            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
948        for key in xrange(5, doc_count, self._params["nparts"]):
949            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
950        for key in xrange(6, doc_count, self._params["nparts"]):
951            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
952        for key in xrange(7, doc_count, self._params["nparts"]):
953            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
954        for key in xrange(8, doc_count, self._params["nparts"]):
955            self.assertFalse(key in all_keys, "Key from partition 7 not in view result")
956
957        # print "Marking partitions [4, 5, 7] as active (to be transferred from replica to main index)"
958        common.set_partition_states(self._params, active = [4, 5, 7])
959
960        info = common.get_set_view_info(self._params)
961        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
962        self.assertEqual(info["passive_partitions"], [4, 5, 7], "right passive partitions list")
963        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
964        self.assertEqual(info["replica_partitions"], [4, 5, 6, 7],
965                         "main group has [4, 5, 6, 7] as replica partitions")
966        self.assertEqual(info["replicas_on_transfer"], [4, 5, 7],
967                         "main group has [4, 5, 7] as replicas on transfer")
968        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
969                          "replica group has [6] as passive partitions")
970        self.assertEqual(info["replica_group_info"]["active_partitions"], [4, 5, 7],
971                          "replica group has [4, 5, 7] as active partitions")
972        self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [],
973                          "replica group has no cleanup partitions")
974
975        # print "Querying view with ?stale=update_after to trigger transferral of" + \
976        #     " replica partitions [4, 5, 7] from replica index to main index"
977
978        updates_before = info["stats"]["full_updates"]
979
980        (resp, view_result) = common.query(self._params, "mapview", {"stale": "update_after", "limit": "1"})
981
982        # print "marking partitions [4, 5] for cleanup while partitions [4, 5, 7] are being transferred from replica to main group"
983        common.set_partition_states(self._params, cleanup = [4, 5])
984
985        info = common.get_set_view_info(self._params)
986        self.assertEqual(info["active_partitions"], [0, 1, 2, 3], "right active partitions list")
987        self.assertEqual(info["passive_partitions"], [7], "right passive partitions list")
988        # Not tested due to race conditions
989        # self.assertEqual(info["cleanup_partitions"], [4, 5], "right cleanup partitions list")
990        self.assertEqual(info["replica_partitions"], [6, 7],
991                         "main group has [6, 7] as replica partitions")
992        self.assertEqual(info["replicas_on_transfer"], [7],
993                         "main group has [7] as replicas on transfer")
994        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
995                          "replica group has [6] as passive partitions")
996        self.assertEqual(info["replica_group_info"]["active_partitions"], [7],
997                          "replica group has [7] as active partitions")
998        # Not tested due to race conditions
999        # self.assertEqual(info["replica_group_info"]["cleanup_partitions"], [4, 5],
1000        #                  "replica group has [4, 5] as cleanup partitions")
1001
1002        # print "Waiting for transferral of replica partitions [7] into main index"
1003        info = common.get_set_view_info(self._params)
1004        seconds = 0
1005        while info["stats"]["full_updates"] < (updates_before + 1):
1006            time.sleep(5)
1007            seconds += 5
1008            if seconds > 900:
1009                raise(Exception("timeout waiting for transfer of replica partitions [7]"))
1010            info = common.get_set_view_info(self._params)
1011
1012        self.assertEqual(info["active_partitions"], [0, 1, 2, 3, 7], "right active partitions list")
1013        self.assertEqual(info["passive_partitions"], [], "right passive partitions list")
1014        self.assertEqual(info["cleanup_partitions"], [], "right cleanup partitions list")
1015        self.assertEqual(info["replica_partitions"], [6], "main group has [6] as replica partitions")
1016        self.assertEqual(info["replicas_on_transfer"], [], "main group has [] as replicas on transfer")
1017        self.assertEqual(info["replica_group_info"]["passive_partitions"], [6],
1018                          "replica group has [6] as passive partitions")
1019        self.assertEqual(info["replica_group_info"]["active_partitions"], [],
1020                          "replica group has [] as active partitions")
1021        self.assertEqual(sorted(info["update_seqs"].keys()), ["0", "1", "2", "3", "7"],
1022                         "Right list of update seqs for main group")
1023        self.assertEqual(sorted(info["replica_group_info"]["update_seqs"].keys()), ["6"],
1024                         "Right list of update seqs for replica group")
1025        for i in [0, 1, 2, 3, 7]:
1026            expected_seq = common.partition_update_seq(self._params, i)
1027            self.assertEqual(info["update_seqs"][str(i)], expected_seq,
1028                             "right update seq number (%d) for partition %d in main group" % (expected_seq, i + 1))
1029        for i in [6]:
1030            expected_seq = common.partition_update_seq(self._params, i)
1031            self.assertEqual(info["replica_group_info"]["update_seqs"][str(i)], expected_seq,
1032                             "right update seq number (%d) for partition %d in replica group" % (expected_seq, i + 1))
1033
1034        # print "Querying map view after marking partitions [4, 5] for cleanup"
1035        (resp, view_result) = common.query(self._params, "mapview")
1036
1037        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3, 7])
1038        self.assertEqual(len(view_result["rows"]), expected_row_count,
1039                         "Query returned %d rows" % expected_row_count)
1040        common.test_keys_sorted(view_result)
1041
1042        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
1043        all_keys = {}
1044        for row in view_result["rows"]:
1045            all_keys[row["key"]] = row["value"]
1046        for key in xrange(1, doc_count, self._params["nparts"]):
1047            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
1048        for key in xrange(2, doc_count, self._params["nparts"]):
1049            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
1050        for key in xrange(3, doc_count, self._params["nparts"]):
1051            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
1052        for key in xrange(4, doc_count, self._params["nparts"]):
1053            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
1054        for key in xrange(5, doc_count, self._params["nparts"]):
1055            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
1056        for key in xrange(6, doc_count, self._params["nparts"]):
1057            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
1058        for key in xrange(7, doc_count, self._params["nparts"]):
1059            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
1060        for key in xrange(8, doc_count, self._params["nparts"]):
1061            self.assertTrue(key in all_keys, "Key from partition 7 in view result")
1062
1063        # print "Querying reduce view, with ?group=false, after marking partitions [4, 5] for cleanup"
1064        (resp, red_view_result_nogroup) = common.query(self._params, "redview", {"stale": "ok"})
1065
1066        expected_reduce_value = common.set_doc_count(self._params, [0, 1, 2, 3, 7])
1067        self.assertEqual(len(red_view_result_nogroup["rows"]), 1, "Query returned 1 row")
1068        self.assertTrue(red_view_result_nogroup["rows"][0]["key"] is None, "Key is null")
1069        self.assertEqual(red_view_result_nogroup["rows"][0]["value"], expected_reduce_value,
1070                         "Reduce value is %d with ?group=false" % expected_reduce_value)
1071
1072        # print "Querying reduce view, with ?group=true, after marking partitions [4, 5] for cleanup"
1073        (resp, red_view_result_group) = common.query(self._params, "redview", {"group": "true", "stale": "ok"})
1074
1075        expected_row_count = common.set_doc_count(self._params, [0, 1, 2, 3, 7])
1076        self.assertEqual(len(red_view_result_group["rows"]), expected_row_count,
1077                         "Query returned %d rows" % expected_row_count)
1078        common.test_keys_sorted(red_view_result_group)
1079
1080        doc_count = common.set_doc_count(self._params, [0, 1, 2, 3, 4, 5, 6, 7])
1081        all_keys = {}
1082        for row in red_view_result_group["rows"]:
1083            all_keys[row["key"]] = row["value"]
1084            self.assertEqual(row["value"], 1, "Value for key %d is 1" % row["key"])
1085        for key in xrange(1, doc_count, self._params["nparts"]):
1086            self.assertTrue(key in all_keys, "Key from partition 0 in view result")
1087        for key in xrange(2, doc_count, self._params["nparts"]):
1088            self.assertTrue(key in all_keys, "Key from partition 1 in view result")
1089        for key in xrange(3, doc_count, self._params["nparts"]):
1090            self.assertTrue(key in all_keys, "Key from partition 2 in view result")
1091        for key in xrange(4, doc_count, self._params["nparts"]):
1092            self.assertTrue(key in all_keys, "Key from partition 3 in view result")
1093        for key in xrange(5, doc_count, self._params["nparts"]):
1094            self.assertFalse(key in all_keys, "Key from partition 4 not in view result")
1095        for key in xrange(6, doc_count, self._params["nparts"]):
1096            self.assertFalse(key in all_keys, "Key from partition 5 not in view result")
1097        for key in xrange(7, doc_count, self._params["nparts"]):
1098            self.assertFalse(key in all_keys, "Key from partition 6 not in view result")
1099        for key in xrange(8, doc_count, self._params["nparts"]):
1100            self.assertTrue(key in all_keys, "Key from partition 7 in view result")
1101
1102
1103    def restart_compare_info(self, info_before, info_after):
1104        # print "Restarting server"
1105        time.sleep(1)
1106        common.restart_server(self._params)
1107
1108        del info_before["stats"]
1109        del info_before["replica_group_info"]["stats"]
1110        del info_after["stats"]
1111        del info_after["replica_group_info"]["stats"]
1112        self.assertEqual(info_after, info_before, "same index state after server restart")
1113
1114
1115    def wait_for_pending_transition_applied(self):
1116        # print "Waiting for pending transition to be applied"
1117        iterations = 0
1118        while True:
1119            if iterations > 600:
1120                raise(Exception("timeout waiting for pending transition to be applied"))
1121            info = common.get_set_view_info(self._params)
1122            if (info["pending_transition"] is None) and \
1123                (info["replica_group_info"]["pending_transition"] is None):
1124                break
1125            else:
1126                time.sleep(1)
1127                iterations += 1
1128