1from threading import Thread
2import threading
3from lib.testconstants import STANDARD_BUCKET_PORT
4from couchbase_helper.document import DesignDocument, View
5from basetestcase import BaseTestCase
6from rebalance.rebalance_base import RebalanceBaseTest
7from membase.api.rest_client import RestConnection, RestHelper
8
9
10class VolumeTests(BaseTestCase):
11    def setUp(self):
12        super(VolumeTests, self).setUp()
13        self.zone = self.input.param("zone", 1)
14        self.recoveryType = self.input.param("recoveryType", "full")
15        self.ddocs = []
16        self.default_view_name = "upgrade-test-view"
17        self.ddocs_num = self.input.param("ddocs-num", 0)
18        self.view_num = self.input.param("view-per-ddoc", 2)
19        self.is_dev_ddoc = self.input.param("is-dev-ddoc", False)
20        self.rate_limit = self.input.param("rate_limit", 100000)
21        self.batch_size = self.input.param("batch_size", 10000)
22        self.doc_size = self.input.param("doc_size", 100)
23        self.loader = self.input.param("loader", "pillowfight")
24        self.instances = self.input.param("instances", 1)
25        self.node_out = self.input.param("node_out", 0)
26        self.threads = self.input.param("threads", 5)
27        self.use_replica_to = self.input.param("use_replica_to",False)
28        self.reload_size = self.input.param("reload_size",50000)
29        self.initial_load= self.input.param("initial_load",10000)
30
31    def tearDown(self):
32        super(VolumeTests, self).tearDown()
33
34    def load(self, server, items, bucket,start_at=0,batch=1000):
35        import subprocess
36        from lib.testconstants import COUCHBASE_FROM_SPOCK
37        rest = RestConnection(server)
38        num_cycles = int((items / batch )) / 5
39        cmd = "cbc-pillowfight -U couchbase://{0}/{3} -I {1} -m 10 -M 100 -B {2} --populate-only --start-at {4} --json".format(server.ip, items, batch,bucket,start_at)
40        if rest.get_nodes_version()[:5] in COUCHBASE_FROM_SPOCK:
41            cmd += " -u Administrator -P password"
42        self.log.info("Executing '{0}'...".format(cmd))
43        rc = subprocess.call(cmd, shell=True)
44        if rc != 0:
45            self.fail("Exception running cbc-pillowfight: subprocess module returned non-zero response!")
46
47    def check_dataloss(self, server, bucket, num_items):
48        from couchbase.bucket import Bucket
49        from couchbase.exceptions import NotFoundError,CouchbaseError
50        from lib.memcached.helper.data_helper import VBucketAwareMemcached
51        self.log.info("########## validating data for bucket : {} ###########".format(bucket))
52        cb_version= cb_version = RestConnection(server).get_nodes_version()[:3]
53        if cb_version < "5":
54            bkt = Bucket('couchbase://{0}/{1}'.format(server.ip, bucket.name),timeout=5000)
55        else:
56            bkt = Bucket('couchbase://{0}/{1}'.format(server.ip, bucket.name),username=server.rest_username,
57                         password=server.rest_password,timeout=5000)
58        rest = RestConnection(self.master)
59        VBucketAware = VBucketAwareMemcached(rest, bucket.name)
60        _, _, _ = VBucketAware.request_map(rest, bucket.name)
61        batch_start = 0
62        batch_end = 0
63        batch_size = 10000
64        errors = []
65        while num_items > batch_end:
66            batch_end = batch_start + batch_size
67            keys = []
68            for i in xrange(batch_start, batch_end, 1):
69                keys.append(str(i).rjust(20, '0'))
70            try:
71                bkt.get_multi(keys)
72                self.log.info("Able to fetch keys starting from {0} to {1}".format(keys[0], keys[len(keys) - 1]))
73            except CouchbaseError as e:
74                self.log.error(e)
75                ok, fail = e.split_results()
76                if fail:
77                    for key in fail:
78                        try:
79                            bkt.get(key)
80                        except NotFoundError:
81                            vBucketId = VBucketAware._get_vBucket_id(key)
82                            errors.append("Missing key: {0}, VBucketId: {1}".
83                                          format(key, vBucketId))
84            batch_start += batch_size
85        self.log.info("Total missing keys:{}".format(len(errors)))
86        self.log.info(errors)
87        return errors
88
89    def create_ddocs_and_views(self):
90        self.default_view = View(self.default_view_name, None, None)
91        for bucket in self.buckets:
92            for i in xrange(int(self.ddocs_num)):
93                views = self.make_default_views(self.default_view_name, self.view_num,
94                                               self.is_dev_ddoc, different_map=True)
95                ddoc = DesignDocument(self.default_view_name + str(i), views)
96                self.ddocs.append(ddoc)
97                for view in views:
98                    self.cluster.create_view(self.master, ddoc.name, view, bucket=bucket)
99
100    def test_volume_with_rebalance(self):
101        self.src_bucket = RestConnection(self.master).get_buckets()
102        rest = RestConnection(self.master)
103        bucket = rest.get_buckets()
104        #load initial documents
105        self.create_ddocs_and_views()
106        load_thread=[]
107        for b in bucket:
108            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items,b)))
109        for t in load_thread:
110            t.start()
111        servers_init = self.servers[:self.nodes_init]
112        new_server_list=self.servers[0:self.nodes_init]
113        for t in load_thread:
114            t.join()
115        self.sleep(30)
116        #Reload more data for mutations
117        load_thread=[]
118        for b in bucket:
119            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items,b,self.num_items)))
120        for t in load_thread:
121            t.start()
122        # #Rebalance in 1 node
123        self.log.info("==========rebalance in 1 node=========")
124        servers_in=self.servers[self.nodes_init:self.nodes_init + 1]
125        rebalance = self.cluster.async_rebalance(servers_init,
126                                                 servers_in, [])
127
128        rebalance.result()
129        for t in load_thread:
130            t.join()
131        for b in bucket:
132         self.check_dataloss(self.master, b,self.num_items*2)
133        # load more document
134        load_thread = []
135        for b in bucket:
136            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b,self.num_items*2)))
137        for t in load_thread:
138            t.start()
139        #rebalance out 1 node
140        new_server_list = self.servers[0:self.nodes_init]+ servers_in
141        self.log.info("==========rebalance out 1 node=========")
142        servers_out=[self.servers[self.nodes_init]]
143        rebalance = self.cluster.async_rebalance(servers_init,[],
144                                                 servers_out)
145        rebalance.result()
146        for t in load_thread:
147            t.join()
148        for b in bucket:
149         self.check_dataloss(self.master, b,self.num_items*3)
150        self.sleep(30)
151         # load more document
152        load_thread = []
153        for b in bucket:
154            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b, self.num_items*3)))
155        for t in load_thread:
156            t.start()
157        new_server_list=list(set(new_server_list)- set(servers_out))
158        #swap rebalance 1 node
159        self.log.info("==========swap rebalance 1 node=========")
160        servers_in = self.servers[self.nodes_init : self.nodes_init + 1]
161        servers_init = self.servers[:self.nodes_init]
162        servers_out = self.servers[(self.nodes_init - 1) : self.nodes_init]
163
164        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
165        rebalance.result()
166        for t in load_thread:
167            t.join()
168        self.sleep(30)
169        for b in bucket:
170         self.check_dataloss(self.master, b,self.num_items*4)
171        # load more document
172        load_thread = []
173        for b in bucket:
174            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b, self.num_items*4)))
175        for t in load_thread:
176            t.start()
177        new_server_list=list(set(new_server_list + servers_in) - set(servers_out))
178        self.log.info("==========Rebalance out of 2 nodes and Rebalance In 1 node=========")
179        # Rebalance out of 2 nodes and Rebalance In 1 node
180        servers_in = [list(set(self.servers) - set(new_server_list))[0]]
181        servers_out = list(set(new_server_list) - set([self.master]))[-2:]
182        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
183        rebalance.result()
184        for t in load_thread:
185            t.join()
186        for b in bucket:
187         self.check_dataloss(self.master, b,self.num_items*5)
188        self.sleep(30)
189        # load more document
190        load_thread = []
191        for b in bucket:
192            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b, self.num_items*5)))
193        for t in load_thread:
194            t.start()
195        new_server_list=list(set(new_server_list + servers_in) - set(servers_out))
196        self.log.info("==========Rebalance out of 1 nodes and Rebalance In 2 nodes=========")
197        #Rebalance out of 1 nodes and Rebalance In 2 nodes
198        servers_in = list(set(self.servers) - set(new_server_list))[0:2]
199        servers_out = list(set(new_server_list) - set([self.master]))[0:1]
200        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
201        rebalance.result()
202        for t in load_thread:
203            t.join()
204        for b in bucket:
205         self.check_dataloss(self.master, b,self.num_items*6)
206        self.sleep(30)
207        # load more document
208        load_thread = []
209        for b in bucket:
210            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b, self.num_items*6)))
211        for t in load_thread:
212            t.start()
213        new_server_list=list(set(new_server_list + servers_in) - set(servers_out))
214        self.log.info("==========Rebalance in 4 nodes =========")
215        #Rebalance in 4 nodes
216        servers_in = list(set(self.servers) - set(new_server_list))[0:4]
217        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
218        rebalance.result()
219        for t in load_thread:
220            t.join()
221        for b in bucket:
222         self.check_dataloss(self.master, b,self.num_items*7)
223        self.sleep(30)
224        # load more document
225        load_thread = []
226        for b in bucket:
227            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b, self.num_items*7)))
228        for t in load_thread:
229            t.start()
230        new_server_list=list(set(new_server_list + servers_in))
231        self.log.info("==========Rebalance out 4 nodes =========")
232        #Rebalance out 4 nodes
233        servers_out = list(set(new_server_list) - set([self.master]))[0:4]
234        rebalance = self.cluster.async_rebalance(servers_init, [], servers_out)
235        rebalance.result()
236        for t in load_thread:
237            t.join()
238        for b in bucket:
239         self.check_dataloss(self.master, b,self.num_items*8)
240        self.sleep(30)
241        # load more document
242        load_thread = []
243        for b in bucket:
244            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b, self.num_items*8)))
245        for t in load_thread:
246            t.start()
247        new_server_list = list(set(new_server_list) - set(servers_out))
248        self.log.info("======Rebalance in 4 nodes (8 nodes) wait for rebalance to finish and move between server groups=========")
249        #Rebalance in 4 nodes (8 nodes) wait for rebalance to finish and move between server groups
250        servers_in = list(set(self.servers) - set(new_server_list))[0:4]
251        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
252        rebalance.result()
253        for t in load_thread:
254            t.join()
255        for b in bucket:
256         self.check_dataloss(self.master, b,self.num_items*9)
257        self.sleep(30)
258        load_thread = []
259        for b in bucket:
260            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b, self.num_items * 9)))
261        for t in load_thread:
262            t.start()
263        self.shuffle_nodes_between_zones_and_rebalance()
264        for t in load_thread:
265            t.join()
266        for b in bucket:
267         self.check_dataloss(self.master, b,self.num_items*10)
268        self.sleep(30)
269        load_thread = []
270        for b in bucket:
271            load_thread.append(Thread(target=self.load, args=(self.master, self.num_items, b, self.num_items * 10)))
272        for t in load_thread:
273            t.start()
274        self.log.info("======Graceful failover 1 KV node and add back(Delta and Full)=========")
275        #Graceful failover 1 KV node and add back(Delta and Full)
276        kv_server = self.get_nodes_from_services_map(service_type="kv", get_all_nodes=False)
277        fail_over_task = self.cluster.async_failover([self.master], failover_nodes=[kv_server], graceful=True)
278        fail_over_task.result()
279        self.sleep(120)
280        # do a recovery and rebalance
281        rest.set_recovery_type('ns_1@' + kv_server.ip, recoveryType=self.recoveryType)
282        rest.add_back_node('ns_1@' + kv_server.ip)
283        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], [], [])
284        rebalance.result()
285        for t in load_thread:
286            t.join()
287        for b in bucket:
288         self.check_dataloss(self.master, b,self.num_items*11)
289        self.sleep(30)
290
291    def test_volume_with_high_ops(self):
292        self.src_bucket = RestConnection(self.master).get_buckets()
293        rest = RestConnection(self.master)
294        bucket = rest.get_buckets()
295        start_at=0
296        total_doc=self.num_items
297        #load initial documents
298        self.create_ddocs_and_views()
299        load_thread=[]
300        for bk in bucket:
301            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(self.master,bk, self.num_items,
302                                                                                    self.batch_size, self.threads, start_at,self.instances)))
303        for t in load_thread:
304            t.start()
305        for t in load_thread:
306            t.join()
307        for b in bucket:
308         self.check_dataloss_for_high_ops_loader(self.master, b,total_doc,batch=self.batch_size,instances=self.instances)
309        self.sleep(30)
310        start_at=total_doc
311        #Reload more data for mutations
312        load_thread=[]
313        for b in bucket:
314            load_thread.append(Thread(target=self.load_buckets_with_high_ops,
315                                      args=(self.master,b, self.num_items,
316                                                       self.batch_size, self.threads, start_at,self.instances)))
317        for t in load_thread:
318            t.start()
319        total_doc +=self.num_items
320        start_at=total_doc
321        servers_init = self.servers[:self.nodes_init]
322        # #Rebalance in 1 node
323        self.log.info("==========rebalance in 1 node=========")
324        servers_in=self.servers[self.nodes_init:self.nodes_init + 1]
325        rebalance = self.cluster.async_rebalance(servers_init,
326                                                 servers_in, [])
327        for t in load_thread:
328            t.join()
329        for b in bucket:
330            num_keys=rest.get_active_key_count(b)
331            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
332        total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
333        rebalance.result()
334        for b in bucket:
335         self.check_dataloss_for_high_ops_loader(self.master, b,total_doc,batch=self.batch_size,instances=self.instances)
336        # Reload more data for mutations
337        load_thread = []
338        for b in bucket:
339            load_thread.append(Thread(target=self.load_buckets_with_high_ops,
340                                      args=(self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
341        for t in load_thread:
342            t.start()
343        total_doc +=self.num_items
344        start_at=total_doc
345        # rebalance out 1 node
346        new_server_list = self.servers[0:self.nodes_init] + servers_in
347        self.log.info("==========rebalance out 1 node=========")
348        servers_out = [self.servers[self.nodes_init]]
349        rebalance = self.cluster.async_rebalance(servers_init, [], servers_out)
350        for t in load_thread:
351            t.join()
352        for b in bucket:
353            num_keys=rest.get_active_key_count(b)
354            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
355        total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
356        rebalance.result()
357        for b in bucket:
358         self.check_dataloss_for_high_ops_loader(self.master, b,total_doc,batch=self.batch_size,instances=self.instances)
359        self.sleep(30)
360        # Reload more data for mutations
361        load_thread = []
362        for b in bucket:
363            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
364            self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
365        for t in load_thread:
366            t.start()
367        total_doc +=self.num_items
368        start_at=total_doc
369        new_server_list = list(set(new_server_list) - set(servers_out))
370        # swap rebalance 1 node
371        self.log.info("==========swap rebalance 1 node=========")
372        servers_in = self.servers[self.nodes_init: self.nodes_init + 1]
373        servers_init = self.servers[:self.nodes_init]
374        servers_out = self.servers[(self.nodes_init - 1): self.nodes_init]
375        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
376        for t in load_thread:
377            t.join()
378        for b in bucket:
379            num_keys=rest.get_active_key_count(b)
380            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
381        total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
382        rebalance.result()
383        for b in bucket:
384            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, batch=self.batch_size,instances=self.instances)
385        self.sleep(30)
386        # load more document
387        load_thread = []
388        for b in bucket:
389            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
390                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
391        for t in load_thread:
392            t.start()
393        total_doc +=self.num_items
394        start_at=total_doc
395        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
396        self.log.info("==========Rebalance out of 2 nodes and Rebalance In 1 node=========")
397        # Rebalance out of 2 nodes and Rebalance In 1 node
398        servers_in = [list(set(self.servers) - set(new_server_list))[0]]
399        servers_out = list(set(new_server_list) - set([self.master]))[-2:]
400        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
401        for t in load_thread:
402            t.join()
403        for b in bucket:
404            num_keys=rest.get_active_key_count(b)
405            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
406        total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
407        rebalance.result()
408        for b in bucket:
409            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,batch=self.batch_size,instances=self.instances)
410        self.sleep(30)
411        # load more document
412        load_thread = []
413        for b in bucket:
414            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
415                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
416        for t in load_thread:
417            t.start()
418        total_doc += self.num_items
419        start_at = total_doc
420        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
421        self.log.info("==========Rebalance out of 1 nodes and Rebalance In 2 nodes=========")
422        # Rebalance out of 1 nodes and Rebalance In 2 nodes
423        servers_in = list(set(self.servers) - set(new_server_list))[0:2]
424        servers_out = list(set(new_server_list) - set([self.master]))[0:1]
425        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
426        for t in load_thread:
427            t.join()
428        for b in bucket:
429            num_keys=rest.get_active_key_count(b)
430            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
431        total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
432        rebalance.result()
433        for b in bucket:
434            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,batch=self.batch_size,instances=self.instances)
435        self.sleep(30)
436        # load more document
437        load_thread = []
438        for b in bucket:
439            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
440                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
441        for t in load_thread:
442            t.start()
443        total_doc += self.num_items
444        start_at = total_doc
445        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
446        self.log.info("==========Rebalance in 4 nodes =========")
447        # Rebalance in 4 nodes
448        servers_in = list(set(self.servers) - set(new_server_list))[0:4]
449        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
450        for t in load_thread:
451            t.join()
452        for b in bucket:
453            num_keys=rest.get_active_key_count(b)
454            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
455        total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
456        rebalance.result()
457        for b in bucket:
458            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,batch=self.batch_size,instances=self.instances)
459        self.sleep(30)
460        # load more document
461        load_thread = []
462        for b in bucket:
463            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
464                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
465        for t in load_thread:
466            t.start()
467        total_doc += self.num_items
468        start_at = total_doc
469        new_server_list = list(set(new_server_list + servers_in))
470        self.log.info("==========Rebalance out 4 nodes =========")
471        # Rebalance out 4 nodes
472        servers_out = list(set(new_server_list) - set([self.master]))[0:4]
473        rebalance = self.cluster.async_rebalance(servers_init, [], servers_out)
474        for t in load_thread:
475            t.join()
476        for b in bucket:
477            num_keys=rest.get_active_key_count(b)
478            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
479        total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
480        rebalance.result()
481        for b in bucket:
482            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,batch=self.batch_size,instances=self.instances)
483        self.sleep(30)
484        # load more document
485        load_thread = []
486        for b in bucket:
487            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
488                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
489        for t in load_thread:
490            t.start()
491        total_doc += self.num_items
492        start_at = total_doc
493        new_server_list = list(set(new_server_list) - set(servers_out))
494        self.log.info(
495            "======Rebalance in 4 nodes (8 nodes) wait for rebalance to finish and move between server groups=========")
496        # Rebalance in 4 nodes (8 nodes) wait for rebalance to finish and move between server groups
497        servers_in = list(set(self.servers) - set(new_server_list))[0:4]
498        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
499        for t in load_thread:
500            t.join()
501        for b in bucket:
502            num_keys=rest.get_active_key_count(b)
503            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
504        total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
505        rebalance.result()
506        for b in bucket:
507            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,batch=self.batch_size,instances=self.instances)
508        self.sleep(30)
509        # load more document
510        load_thread = []
511        for b in bucket:
512            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
513                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
514        for t in load_thread:
515            t.start()
516        total_doc += self.num_items
517        start_at = total_doc
518        self.log.info("####### Shuffling zones and rebalance #######")
519        self.shuffle_nodes_between_zones_and_rebalance()
520        for t in load_thread:
521            t.join()
522        for b in bucket:
523            num_keys=rest.get_active_key_count(b)
524            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
525        total_doc, start_at = self.load_till_rebalance_progress(rest, bucket, total_doc, start_at)
526        for b in bucket:
527            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,batch=self.batch_size,instances=self.instances)
528        self.sleep(30)
529        # load more document
530        load_thread = []
531        for b in bucket:
532            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
533                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
534        for t in load_thread:
535            t.start()
536        total_doc += self.num_items
537        start_at = total_doc
538        self.log.info("======Graceful failover 1 KV node and add back(Delta and Full)=========")
539        # Graceful failover 1 KV node and add back(Delta and Full)
540        kv_server = self.get_nodes_from_services_map(service_type="kv", get_all_nodes=False)
541        fail_over_task = self.cluster.async_failover([self.master], failover_nodes=[kv_server], graceful=True)
542        fail_over_task.result()
543        self.sleep(120)
544        # do a recovery and rebalance
545        rest.set_recovery_type('ns_1@' + kv_server.ip, recoveryType=self.recoveryType)
546        rest.add_back_node('ns_1@' + kv_server.ip)
547        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], [], [])
548        for t in load_thread:
549            t.join()
550        for b in bucket:
551            num_keys=rest.get_active_key_count(b)
552            self.log.info("****** Number of doc in bucket : {}".format(num_keys))
553        total_doc, start_at = self.load_till_rebalance_progress(rest, bucket, total_doc, start_at)
554        rebalance.result()
555        for b in bucket:
556            errors=self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,batch=self.batch_size,instances=self.instances)
557            if len(errors) > 0:
558                self.fail("data is missing");
559        self.sleep(30)
560
561    def test_volume_with_high_ops_update(self):
562        self.src_bucket = RestConnection(self.master).get_buckets()
563        rest = RestConnection(self.master)
564        bucket = rest.get_buckets()
565        start_at = 0
566        total_doc = self.num_items
567        updated=1
568        # load initial documents
569        self.create_ddocs_and_views()
570        load_thread = []
571        for bk in bucket:
572            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
573            self.master, bk, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
574        for t in load_thread:
575            t.start()
576        for t in load_thread:
577            t.join()
578        for b in bucket:
579            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,batch=self.batch_size,instances=self.instances)
580        self.sleep(30)
581        #Update all data
582        load_thread=[]
583        for b in bucket:
584            load_thread.append(Thread(target=self.update_buckets_with_high_ops,
585                                      args=(self.master,b, total_doc,total_doc,
586                                                       self.batch_size, self.threads, start_at,self.instances,updated)))
587        for t in load_thread:
588            t.start()
589        servers_init = self.servers[:self.nodes_init]
590        #Rebalance in 1 node
591        self.log.info("==========rebalance in 1 node=========")
592        servers_in=self.servers[self.nodes_init:self.nodes_init + 1]
593        rebalance = self.cluster.async_rebalance(servers_init,
594                                                 servers_in, [])
595        for t in load_thread:
596            t.join()
597        #total_doc,start_at=self.load_till_rebalance_progress(rest,bucket,total_doc,start_at)
598        rebalance.result()
599        for b in bucket:
600            self.check_dataloss_for_high_ops_loader(self.master, b,total_doc,updated=True,ops=total_doc,
601                                                    batch=self.batch_size, instances=self.instances)
602        updated +=1
603        #Update all data
604        load_thread = []
605        for b in bucket:
606         load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
607         self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,updated)))
608        for t in load_thread:
609         t.start()
610        # rebalance out 1 node
611        new_server_list = self.servers[0:self.nodes_init] + servers_in
612        self.log.info("==========rebalance out 1 node=========")
613        servers_out = [self.servers[self.nodes_init]]
614        rebalance = self.cluster.async_rebalance(servers_init, [], servers_out)
615        rebalance.result()
616        for t in load_thread:
617         t.join()
618        for b in bucket:
619         self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, updated=True,
620                                                 ops=total_doc*updated,batch=self.batch_size,instances=self.instances)
621        updated +=1
622        #Update all data
623        load_thread = []
624        for b in bucket:
625         load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
626         self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,updated)))
627        for t in load_thread:
628         t.start()
629        new_server_list = list(set(new_server_list) - set(servers_out))
630        # swap rebalance 1 node
631        self.log.info("==========swap rebalance 1 node=========")
632        servers_in = self.servers[self.nodes_init: self.nodes_init + 1]
633        servers_init = self.servers[:self.nodes_init]
634        servers_out = self.servers[(self.nodes_init - 1): self.nodes_init]
635        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
636        rebalance.result()
637        for t in load_thread:
638            t.join()
639        for b in bucket:
640         self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, updated=True,
641                                                 ops=total_doc*updated,batch=self.batch_size,instances=self.instances)
642        updated += 1
643        # Update all data
644        load_thread = []
645        for b in bucket:
646            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
647                self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,
648                updated)))
649        for t in load_thread:
650            t.start()
651        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
652        self.log.info("==========Rebalance out of 2 nodes and Rebalance In 1 node=========")
653        # Rebalance out of 2 nodes and Rebalance In 1 node
654        servers_in = [list(set(self.servers) - set(new_server_list))[0]]
655        servers_out = list(set(new_server_list) - set([self.master]))[-2:]
656        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
657        rebalance.result()
658        for t in load_thread:
659            t.join()
660        for b in bucket:
661            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, updated=True,
662                                                    ops=total_doc * updated,batch=self.batch_size,instances=self.instances)
663        updated += 1
664        # Update all data
665        load_thread = []
666        for b in bucket:
667            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
668                self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,
669                updated)))
670        for t in load_thread:
671            t.start()
672        self.log.info("==========Rebalance out of 1 nodes and Rebalance In 2 nodes=========")
673        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
674        # Rebalance out of 1 nodes and Rebalance In 2 nodes
675        servers_in = list(set(self.servers) - set(new_server_list))[0:2]
676        servers_out = list(set(new_server_list) - set([self.master]))[0:1]
677        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
678        rebalance.result()
679        for t in load_thread:
680            t.join()
681        for b in bucket:
682            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, updated=True,
683                                                    ops=total_doc * updated,batch=self.batch_size,instances=self.instances)
684        updated += 1
685        # Update all data
686        load_thread = []
687        for b in bucket:
688            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
689                self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,
690                updated)))
691        for t in load_thread:
692            t.start()
693        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
694        self.log.info("==========Rebalance in 4 nodes =========")
695        # Rebalance in 4 nodes
696        servers_in = list(set(self.servers) - set(new_server_list))[0:4]
697        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
698        rebalance.result()
699        for t in load_thread:
700            t.join()
701        for b in bucket:
702            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, updated=True,
703                                                    ops=total_doc * updated,batch=self.batch_size,instances=self.instances)
704        updated += 1
705        # Update all data
706        load_thread = []
707        for b in bucket:
708            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
709                self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,
710                updated)))
711        for t in load_thread:
712            t.start()
713        new_server_list = list(set(new_server_list + servers_in))
714        self.log.info("==========Rebalance out 4 nodes =========")
715        # Rebalance out 4 nodes
716        servers_out = list(set(new_server_list) - set([self.master]))[0:4]
717        rebalance = self.cluster.async_rebalance(servers_init, [], servers_out)
718        rebalance.result()
719        for t in load_thread:
720            t.join()
721        for b in bucket:
722            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc,  updated=True,
723                                                    ops=total_doc * updated, batch=self.batch_size,
724                                                    instances=self.instances)
725        updated += 1
726        # Update all data
727        load_thread = []
728        for b in bucket:
729            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
730                self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,
731                updated)))
732        for t in load_thread:
733            t.start()
734        new_server_list = list(set(new_server_list) - set(servers_out))
735        self.log.info(
736            "======Rebalance in 4 nodes (8 nodes) wait for rebalance to finish and move between server groups=========")
737        # Rebalance in 4 nodes (8 nodes) wait for rebalance to finish and move between server groups
738        servers_in = list(set(self.servers) - set(new_server_list))[0:4]
739        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
740        rebalance.result()
741        for t in load_thread:
742            t.join()
743        for b in bucket:
744            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, updated=True,
745                                                    ops=total_doc * updated, batch=self.batch_size,
746                                                    instances=self.instances)
747        updated += 1
748        # Update all data
749        load_thread = []
750        for b in bucket:
751            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
752                self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,
753                updated)))
754        for t in load_thread:
755            t.start()
756        self.log.info("####### Shuffling zones and rebalance #######")
757        self.shuffle_nodes_between_zones_and_rebalance()
758        for t in load_thread:
759            t.join()
760        for b in bucket:
761            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, updated=True,
762                                                    ops=total_doc * updated, batch=self.batch_size,
763                                                    instances=self.instances)
764        updated += 1
765        # Update all data
766        load_thread = []
767        for b in bucket:
768            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
769                self.master, b, total_doc, total_doc, self.batch_size, self.threads, start_at, self.instances,
770                updated)))
771        for t in load_thread:
772            t.start()
773        self.log.info("======Graceful failover 1 KV node and add back(Delta and Full)=========")
774        # Graceful failover 1 KV node and add back(Delta and Full)
775        kv_server = self.get_nodes_from_services_map(service_type="kv", get_all_nodes=False)
776        fail_over_task = self.cluster.async_failover([self.master], failover_nodes=[kv_server], graceful=True)
777        fail_over_task.result()
778        self.sleep(120)
779        # do a recovery and rebalance
780        rest.set_recovery_type('ns_1@' + kv_server.ip, recoveryType=self.recoveryType)
781        rest.add_back_node('ns_1@' + kv_server.ip)
782        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], [], [])
783        rebalance.result()
784        for t in load_thread:
785            t.join()
786        for b in bucket:
787            errors=self.check_dataloss_for_high_ops_loader(self.master, b, total_doc, updated=True,
788                                                    ops=total_doc * updated, batch=self.batch_size,
789                                                           instances=self.instances)
790            if len(errors) > 0:
791                self.fail("data is missing");
792
793    def test_volume_with_high_ops_create_update(self):
794        self.src_bucket = RestConnection(self.master).get_buckets()
795        rest = RestConnection(self.master)
796        bucket = rest.get_buckets()
797        start_at = 0
798        # load initial documents
799        self.create_ddocs_and_views()
800        load_thread = []
801        for bk in bucket:
802            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
803            self.master, bk, self.initial_load, self.batch_size, self.threads, start_at, self.instances)))
804        for t in load_thread:
805            t.start()
806        for t in load_thread:
807            t.join()
808        for b in bucket:
809            self.check_dataloss_for_high_ops_loader(self.master, b, self.initial_load,batch=self.batch_size,instances=self.instances)
810        self.sleep(30)
811        total_doc = self.initial_load
812        start_at=total_doc
813        #Update initial doc and create more doc
814        load_thread=[]
815        create_thread=[]
816        updated=1
817        for b in bucket:
818            load_thread.append(Thread(target=self.update_buckets_with_high_ops,
819                                      args=(self.master,b, self.initial_load,self.initial_load,
820                                                       self.batch_size, self.threads, 0,self.instances,updated)))
821            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
822            self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
823        for t in load_thread:
824            t.start()
825        for th in create_thread:
826            th.start()
827        total_doc +=self.num_items
828        start_at=total_doc
829        updated +=1
830        servers_init = self.servers[:self.nodes_init]
831        #Rebalance in 1 node
832        self.log.info("==========rebalance in 1 node=========")
833        servers_in=self.servers[self.nodes_init:self.nodes_init + 1]
834        rebalance = self.cluster.async_rebalance(servers_init,
835                                                 servers_in, [])
836        self.sleep(10)
837        for t in load_thread:
838            t.join()
839        for th in create_thread:
840            th.join()
841        total_doc,start_at,updated=self.create_update_till_rebalance_progress(rest,bucket,total_doc,start_at,updated)
842        rebalance.result()
843        for b in bucket:
844            self.check_dataloss_for_high_ops_loader(self.master, b,self.initial_load,start_document=0,updated=True,
845                                                    ops=self.initial_load*(updated-1),batch=self.batch_size,instances=self.instances)
846            self.check_dataloss_for_high_ops_loader(self.master,b,total_doc-self.initial_load,
847                                                    start_document=self.initial_load,batch=self.batch_size,instances=self.instances)
848        # Update initial doc and create more doc
849        load_thread = []
850        create_thread = []
851        for b in bucket:
852            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
853            self.master, b, self.initial_load, self.initial_load, self.batch_size, self.threads, 0, self.instances, updated)))
854            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
855                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
856        for t in load_thread:
857            t.start()
858        for th in create_thread:
859            th.start()
860        total_doc += self.num_items
861        start_at = total_doc
862        updated +=1
863        # rebalance out 1 node
864        new_server_list = self.servers[0:self.nodes_init] + servers_in
865        self.log.info("==========rebalance out 1 node=========")
866        servers_out = [self.servers[self.nodes_init]]
867        rebalance = self.cluster.async_rebalance(servers_init, [], servers_out)
868        rebalance.result()
869        self.sleep(5)
870        for t in load_thread:
871            t.join()
872        for th in create_thread:
873            th.join()
874        total_doc,start_at,updated=self.create_update_till_rebalance_progress(rest,bucket,total_doc,start_at,updated)
875        rebalance.result()
876        for b in bucket:
877            self.check_dataloss_for_high_ops_loader(self.master, b,self.initial_load,start_document=0,updated=True,
878                                                    ops=self.initial_load*(updated-1),
879                                                    batch=self.batch_size,instances=self.instances)
880            self.check_dataloss_for_high_ops_loader(self.master,b,total_doc-self.initial_load,
881                                                    start_document=self.initial_load, batch=self.batch_size,
882                                                    instances=self.instances)
883        # Update initial doc and create more doc
884        load_thread = []
885        create_thread = []
886        for b in bucket:
887            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
888                self.master, b, self.initial_load, self.initial_load, self.batch_size, self.threads, 0, self.instances,
889                updated)))
890            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
891                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
892        for t in load_thread:
893            t.start()
894        for th in create_thread:
895            th.start()
896        total_doc += self.num_items
897        start_at = total_doc
898        updated +=1
899        new_server_list = list(set(new_server_list) - set(servers_out))
900        # swap rebalance 1 node
901        self.log.info("==========swap rebalance 1 node=========")
902        servers_in = self.servers[self.nodes_init: self.nodes_init + 1]
903        servers_init = self.servers[:self.nodes_init]
904        servers_out = self.servers[(self.nodes_init - 1): self.nodes_init]
905        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
906        self.sleep(10)
907        for t in load_thread:
908            t.join()
909        for th in create_thread:
910            th.join()
911        total_doc, start_at, updated = self.create_update_till_rebalance_progress(rest, bucket, total_doc, start_at,
912                                                                                  updated)
913        rebalance.result()
914        for b in bucket:
915            self.check_dataloss_for_high_ops_loader(self.master, b,self.initial_load,start_document=0,updated=True,
916                                                    ops=self.initial_load*(updated-1), batch=self.batch_size,
917                                                    instances=self.instances)
918            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc - self.initial_load,
919                                                    start_document=self.initial_load, batch=self.batch_size,
920                                                    instances=self.instances)
921        # Update initial doc and create more doc
922        load_thread = []
923        create_thread = []
924        for b in bucket:
925            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
926                self.master, b, self.initial_load, self.initial_load, self.batch_size, self.threads, 0,
927                self.instances, updated)))
928            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
929                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
930        for t in load_thread:
931            t.start()
932        for th in create_thread:
933            th.start()
934        total_doc += self.num_items
935        start_at = total_doc
936        updated += 1
937        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
938        self.log.info("==========Rebalance out of 2 nodes and Rebalance In 1 node=========")
939        # Rebalance out of 2 nodes and Rebalance In 1 node
940        servers_in = [list(set(self.servers) - set(new_server_list))[0]]
941        servers_out = list(set(new_server_list) - set([self.master]))[-2:]
942        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
943        self.sleep(10)
944        for t in load_thread:
945            t.join()
946        for th in create_thread:
947            th.join()
948        total_doc, start_at, updated = self.create_update_till_rebalance_progress(rest, bucket, total_doc, start_at,
949                                                                                  updated)
950        rebalance.result()
951        for b in bucket:
952            self.check_dataloss_for_high_ops_loader(self.master, b, self.initial_load, start_document=0, updated=True,
953                                                    ops=self.initial_load * (updated - 1),
954                                                    batch=self.batch_size,instances=self.instances)
955            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc - self.initial_load,
956                                                    start_document=self.initial_load, batch=self.batch_size,
957                                                    instances=self.instances)
958        # Update initial doc and create more doc
959        load_thread = []
960        create_thread = []
961        for b in bucket:
962            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
963                self.master, b, self.initial_load, self.initial_load, self.batch_size, self.threads, 0,
964                self.instances, updated)))
965            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
966                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
967        for t in load_thread:
968            t.start()
969        for th in create_thread:
970            th.start()
971        total_doc += self.num_items
972        start_at = total_doc
973        updated += 1
974        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
975        self.log.info("==========Rebalance out of 1 nodes and Rebalance In 2 nodes=========")
976        # Rebalance out of 1 nodes and Rebalance In 2 nodes
977        servers_in = list(set(self.servers) - set(new_server_list))[0:2]
978        servers_out = list(set(new_server_list) - set([self.master]))[0:1]
979        rebalance = self.cluster.async_rebalance(servers_init, servers_in, servers_out)
980        self.sleep(10)
981        for t in load_thread:
982            t.join()
983        for th in create_thread:
984            th.join()
985        total_doc, start_at, updated = self.create_update_till_rebalance_progress(rest, bucket, total_doc, start_at,
986                                                                                  updated)
987        rebalance.result()
988        for b in bucket:
989            self.check_dataloss_for_high_ops_loader(self.master, b, self.initial_load, start_document=0, updated=True,
990                                                    ops=self.initial_load * (updated - 1), batch=self.batch_size,
991                                                    instances=self.instances)
992            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc - self.initial_load,
993                                                    start_document=self.initial_load, batch=self.batch_size,
994                                                    instances=self.instances)
995        # Update initial doc and create more doc
996        load_thread = []
997        create_thread = []
998        for b in bucket:
999            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
1000                self.master, b, self.initial_load, self.initial_load, self.batch_size, self.threads, 0,
1001                self.instances, updated)))
1002            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
1003                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
1004        for t in load_thread:
1005            t.start()
1006        for th in create_thread:
1007            th.start()
1008        total_doc += self.num_items
1009        start_at = total_doc
1010        updated += 1
1011        new_server_list = list(set(new_server_list + servers_in) - set(servers_out))
1012        self.log.info("==========Rebalance in 4 nodes =========")
1013        # Rebalance in 4 nodes
1014        servers_in = list(set(self.servers) - set(new_server_list))[0:4]
1015        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
1016        self.sleep(10)
1017        for t in load_thread:
1018            t.join()
1019        for th in create_thread:
1020            th.join()
1021        total_doc, start_at, updated = self.create_update_till_rebalance_progress(rest, bucket, total_doc, start_at,
1022                                                                                  updated)
1023        rebalance.result()
1024        for b in bucket:
1025            self.check_dataloss_for_high_ops_loader(self.master, b, self.initial_load, start_document=0, updated=True,
1026                                                    ops=self.initial_load * (updated - 1), batch=self.batch_size,
1027                                                    instances=self.instances)
1028            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc - self.initial_load,
1029                                                    start_document=self.initial_load, batch=self.batch_size,
1030                                                    instances=self.instances)
1031        # Update initial doc and create more doc
1032        load_thread = []
1033        create_thread = []
1034        for b in bucket:
1035            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
1036                self.master, b, self.initial_load, self.initial_load, self.batch_size, self.threads, 0,
1037                self.instances, updated)))
1038            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
1039                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
1040        for t in load_thread:
1041            t.start()
1042        for th in create_thread:
1043            th.start()
1044        total_doc += self.num_items
1045        start_at = total_doc
1046        updated += 1
1047        new_server_list = list(set(new_server_list + servers_in))
1048        self.log.info("==========Rebalance out 4 nodes =========")
1049        # Rebalance out 4 nodes
1050        servers_out = list(set(new_server_list) - set([self.master]))[0:4]
1051        rebalance = self.cluster.async_rebalance(servers_init, [], servers_out)
1052        self.sleep(10)
1053        for t in load_thread:
1054            t.join()
1055        for th in create_thread:
1056            th.join()
1057        total_doc, start_at, updated = self.create_update_till_rebalance_progress(rest, bucket, total_doc, start_at,
1058                                                                                  updated)
1059        rebalance.result()
1060        for b in bucket:
1061            self.check_dataloss_for_high_ops_loader(self.master, b, self.initial_load, start_document=0, updated=True,
1062                                                    ops=self.initial_load * (updated - 1), batch=self.batch_size,
1063                                                    instances=self.instances)
1064            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc - self.initial_load,
1065                                                    start_document=self.initial_load, batch=self.batch_size,
1066                                                    instances=self.instances)
1067        # Update initial doc and create more doc
1068        load_thread = []
1069        create_thread = []
1070        for b in bucket:
1071            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
1072                self.master, b, self.initial_load, self.initial_load, self.batch_size, self.threads, 0,
1073                self.instances, updated)))
1074            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
1075                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
1076        for t in load_thread:
1077            t.start()
1078        for th in create_thread:
1079            th.start()
1080        total_doc += self.num_items
1081        start_at = total_doc
1082        updated += 1
1083        new_server_list = list(set(new_server_list) - set(servers_out))
1084        self.log.info(
1085            "======Rebalance in 4 nodes (8 nodes) wait for rebalance to finish and move between server groups=========")
1086        # Rebalance in 4 nodes (8 nodes) wait for rebalance to finish and move between server groups
1087        servers_in = list(set(self.servers) - set(new_server_list))[0:4]
1088        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
1089        self.sleep(10)
1090        for t in load_thread:
1091            t.join()
1092        for th in create_thread:
1093            th.join()
1094        total_doc, start_at, updated = self.create_update_till_rebalance_progress(rest, bucket, total_doc, start_at,
1095                                                                                  updated)
1096        rebalance.result()
1097        for b in bucket:
1098            self.check_dataloss_for_high_ops_loader(self.master, b, self.initial_load, start_document=0, updated=True,
1099                                                    ops=self.initial_load * (updated - 1), batch=self.batch_size,
1100                                                    instances=self.instances)
1101            self.check_dataloss_for_high_ops_loader(self.master, b, total_doc - self.initial_load,
1102                                                    start_document=self.initial_load, batch=self.batch_size,
1103                                                    instances=self.instances)
1104        # Update initial doc and create more doc
1105        load_thread = []
1106        create_thread = []
1107        for b in bucket:
1108            load_thread.append(Thread(target=self.update_buckets_with_high_ops, args=(
1109                self.master, b, self.initial_load, self.initial_load, self.batch_size, self.threads, 0,
1110                self.instances, updated)))
1111            create_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
1112                self.master, b, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
1113        for t in load_thread:
1114            t.start()
1115        for th in create_thread:
1116            th.start()
1117        total_doc += self.num_items
1118        start_at = total_doc
1119        updated += 1
1120        self.log.info("======Graceful failover 1 KV node and add back(Delta and Full)=========")
1121        # Graceful failover 1 KV node and add back(Delta and Full)
1122        kv_server = self.get_nodes_from_services_map(service_type="kv", get_all_nodes=False)
1123        fail_over_task = self.cluster.async_failover([self.master], failover_nodes=[kv_server], graceful=True)
1124        fail_over_task.result()
1125        self.sleep(120)
1126        # do a recovery and rebalance
1127        rest.set_recovery_type('ns_1@' + kv_server.ip, recoveryType=self.recoveryType)
1128        rest.add_back_node('ns_1@' + kv_server.ip)
1129        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init], [], [])
1130        self.sleep(10)
1131        for t in load_thread:
1132            t.join()
1133        for th in create_thread:
1134            th.join()
1135        total_doc, start_at, updated = self.create_update_till_rebalance_progress(rest, bucket, total_doc, start_at,
1136                                                                                  updated)
1137        rebalance.result()
1138        for b in bucket:
1139            errors1=self.check_dataloss_for_high_ops_loader(self.master, b, self.initial_load, start_document=0, updated=True,
1140                                                    ops=self.initial_load * (updated - 1), batch=self.batch_size,
1141                                                            instances=self.instances)
1142            errors2=self.check_dataloss_for_high_ops_loader(self.master, b, total_doc - self.initial_load,
1143                                                    start_document=self.initial_load, batch=self.batch_size,
1144                                                            instances=self.instances)
1145            if len(errors1) > 0 or len(errors2) > 0:
1146                self.fail("data is missing");
1147
1148
1149    def load_till_rebalance_progress(self,rest,bucket,total_doc,start_at):
1150        rebalance_status = rest._rebalance_progress_status()
1151        self.log.info("###### Rebalance Status:{} ######".format(rebalance_status))
1152        self.sleep(10)
1153        while rebalance_status == 'running':
1154            self.log.info("===== Loading {} as rebalance is going on =====".format(self.reload_size))
1155            load_thread = []
1156            for b in bucket:
1157                load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(self.master,b, self.reload_size,
1158                                                                                    self.batch_size, self.threads, start_at,self.instances)))
1159            for t in load_thread:
1160                t.start()
1161            for t in load_thread:
1162                t.join()
1163            rebalance_status = rest._rebalance_progress_status()
1164            self.log.info("###### Rebalance Status:{} ######".format(rebalance_status))
1165            total_doc += self.reload_size
1166            start_at = total_doc
1167        return total_doc,start_at
1168
1169    def create_update_till_rebalance_progress(self,rest,bucket,total_doc,start_at,updated):
1170        rebalance_status = rest._rebalance_progress_status()
1171        self.log.info("###### Rebalance Status:{} ######".format(rebalance_status))
1172        self.sleep(10)
1173        while rebalance_status == 'running':
1174            self.log.info("===== Loading {} as rebalance is going on =====".format(self.reload_size))
1175            load_thread = []
1176            update_thread = []
1177            for b in bucket:
1178                load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(self.master,b, self.reload_size,
1179                                                                                    self.batch_size, self.threads,
1180                                                                                        start_at,self.threads)))
1181                update_thread.append(Thread(target=self.update_buckets_with_high_ops,
1182                                      args=(self.master,b, self.initial_load,self.initial_load,
1183                                                       self.batch_size, self.threads, 0,self.instances,updated)))
1184            for t in load_thread:
1185                t.start()
1186            for th in update_thread:
1187                th.start()
1188            for t in load_thread:
1189                t.join()
1190            for th in update_thread:
1191                th.join()
1192            rebalance_status = rest._rebalance_progress_status()
1193            self.log.info("###### Rebalance Status:{} ######".format(rebalance_status))
1194            total_doc += self.reload_size
1195            start_at = total_doc
1196            updated += 1
1197        return total_doc,start_at,updated
1198
1199    def shuffle_nodes_between_zones_and_rebalance(self, to_remove=None):
1200        """
1201        Shuffle the nodes present in the cluster if zone > 1. Rebalance the nodes in the end.
1202        Nodes are divided into groups iteratively i.e. 1st node in Group 1, 2nd in Group 2, 3rd in Group 1 and so on, when
1203        zone=2.
1204        :param to_remove: List of nodes to be removed.
1205        """
1206        if not to_remove:
1207            to_remove = []
1208        serverinfo = self.servers[0]
1209        rest = RestConnection(serverinfo)
1210        zones = ["Group 1"]
1211        nodes_in_zone = {"Group 1": [serverinfo.ip]}
1212        # Create zones, if not existing, based on params zone in test.
1213        # Shuffle the nodes between zones.
1214        if int(self.zone) > 1:
1215            for i in range(1, int(self.zone)):
1216                a = "Group "
1217                zones.append(a + str(i + 1))
1218                if not rest.is_zone_exist(zones[i]):
1219                    rest.add_zone(zones[i])
1220                nodes_in_zone[zones[i]] = []
1221            # Divide the nodes between zones.
1222            nodes_in_cluster = [node.ip for node in self.get_nodes_in_cluster()]
1223            nodes_to_remove = [node.ip for node in to_remove]
1224            for i in range(1, len(self.servers)):
1225                if self.servers[i].ip in nodes_in_cluster and self.servers[i].ip not in nodes_to_remove:
1226                    server_group = i % int(self.zone)
1227                    nodes_in_zone[zones[server_group]].append(self.servers[i].ip)
1228            # Shuffle the nodesS
1229            for i in range(1, self.zone):
1230                node_in_zone = list(set(nodes_in_zone[zones[i]]) -
1231                                    set([node for node in rest.get_nodes_in_zone(zones[i])]))
1232                rest.shuffle_nodes_in_zones(node_in_zone, zones[0], zones[i])
1233        otpnodes = [node.id for node in rest.node_statuses()]
1234        nodes_to_remove = [node.id for node in rest.node_statuses() if node.ip in [t.ip for t in to_remove]]
1235        # Start rebalance and monitor it.
1236        started = rest.rebalance(otpNodes=otpnodes, ejectedNodes=nodes_to_remove)
1237        if started:
1238            result = rest.monitorRebalance()
1239            msg = "successfully rebalanced cluster {0}"
1240            self.log.info(msg.format(result))
1241        # Verify replicas of one node should not be in the same zone as active vbuckets of the node.
1242        if self.zone > 1:
1243            self._verify_replica_distribution_in_zones(nodes_in_zone)
1244
1245    def update_buckets_with_high_ops(self, server, bucket, items, ops,
1246                                     batch=20000, threads=5, start_document=0,
1247                                     instances=1,update_counter=1):
1248        import subprocess
1249        #cmd_format = "python scripts/high_ops_doc_loader.py  --node {0} --bucket {1} --user {2} --password {3} " \
1250        #             "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --instances {" \
1251        #             "9} --ops {10} --updates --update_counter {11}"
1252        cmd_format = "python scripts/thanosied.py  --spec couchbase://{0} --bucket {1} --user {2} --password {3} " \
1253                     "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --workers {9} --rate_limit {10} " \
1254                     "--passes 1  --update_counter {11}"
1255        cb_version = RestConnection(server).get_nodes_version()[:3]
1256        if self.num_replicas > 1:
1257            cmd_format = "{} --replicate_to 1".format(cmd_format)
1258        cmd = cmd_format.format(server.ip, bucket.name, server.rest_username,
1259                                server.rest_password,
1260                                items, batch, threads, start_document,
1261                                cb_version, instances, int(ops),update_counter)
1262        self.log.info("Running {}".format(cmd))
1263        result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
1264                                  stderr=subprocess.PIPE)
1265        output = result.stdout.read()
1266        error = result.stderr.read()
1267        if error:
1268            self.log.error(error)
1269            self.fail("Failed to run the loadgen.")
1270        if output:
1271            loaded = output.split('\n')[:-1]
1272            total_loaded = 0
1273            for load in loaded:
1274                total_loaded += int(load.split(':')[1].strip())
1275            self.assertEqual(total_loaded, ops,
1276                             "Failed to update {} items. Loaded only {} items".format(
1277                                 ops,
1278                                 total_loaded))
1279
1280    def load_buckets_with_high_ops(self, server, bucket, items, batch=20000, threads=10, start_document=0, instances=1
1281                                   ,ttl=0):
1282        import subprocess
1283        #cmd_format = "python scripts/high_ops_doc_loader.py  --node {0} --bucket {1} --user {2} --password {3} " \
1284        #             "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --instances {9} --ttl {10}"
1285        cmd_format = "python scripts/thanosied.py  --spec couchbase://{0} --bucket {1} --user {2} --password {3} " \
1286                     "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --workers {9} --ttl {10}" \
1287                     "--passes 1"
1288        cb_version = RestConnection(server).get_nodes_version()[:3]
1289        if self.num_replicas > 0 and self.use_replica_to:
1290            cmd_format = "{} --replicate_to 1".format(cmd_format)
1291        cmd = cmd_format.format(server.ip, bucket.name, server.rest_username, server.rest_password, items, batch,
1292                                threads, start_document, cb_version, instances, ttl)
1293        self.log.info("Running {}".format(cmd))
1294        result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1295        output = result.stdout.read()
1296        error = result.stderr.read()
1297        if error:
1298            self.log.error(error)
1299            self.fail("Failed to run the loadgen.")
1300        if output:
1301            loaded = output.split('\n')[:-1]
1302            total_loaded = 0
1303            for load in loaded:
1304                total_loaded += int(load.split(':')[1].strip())
1305            self.assertEqual(total_loaded, items,
1306                             "Failed to load {} items. Loaded only {} items".format(items, total_loaded))
1307
1308    def load_docs(self, bucket,num_items=0, start_document=0):
1309        if self.loader == "pillowfight":
1310            load_thread = Thread(target=self.load,
1311                                 name="pillowfight_load",
1312                                 args=(self.master, self.num_items, self.batch_size, self.doc_size, self.rate_limit))
1313            return load_thread
1314        elif self.loader == "high_ops":
1315            if num_items == 0:
1316                num_items = self.num_items
1317            load_thread = Thread(target=self.load_buckets_with_high_ops,
1318                                 name="high_ops_load",
1319                                 args=(self.master, bucket, num_items, self.batch_size,
1320                                       self.threads, start_document, self.instances))
1321            return load_thread
1322
1323    def check_data(self, server, bucket, num_items=0):
1324        if self.loader == "pillowfight":
1325            return self.check_dataloss(server, bucket,num_items)
1326        elif self.loader == "high_ops":
1327            return self.check_dataloss_for_high_ops_loader(server, bucket, num_items)
1328
1329    def check_dataloss_for_high_ops_loader(self, server, bucket, items,
1330                                           batch=2000, threads=5,
1331                                           start_document=0,
1332                                           updated=False, ops=0,instances=1):
1333        import subprocess
1334        from lib.memcached.helper.data_helper import VBucketAwareMemcached
1335        #cmd_format = "python scripts/high_ops_doc_loader.py  --node {0} --bucket {1} --user {2} --password {3} " \
1336        #             "--count {4} " \
1337        #             "--batch_size {5} --instances {9} --threads {6} --start_document {7} --cb_version {8} --validate"
1338        cmd_format = "python scripts/thanosied.py  --spec couchbase://{0} --bucket {1} --user {2} --password {3} " \
1339                     "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --workers {9} --validation 1 " \
1340                     "--passes 1"
1341        cb_version = RestConnection(server).get_nodes_version()[:3]
1342        if updated:
1343            cmd_format = "{} --updated --ops {}".format(cmd_format, int(ops))
1344        cmd = cmd_format.format(server.ip, bucket.name, server.rest_username,
1345                                server.rest_password,
1346                                int(items), batch, threads, start_document, cb_version,instances)
1347        self.log.info("Running {}".format(cmd))
1348        result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
1349                                  stderr=subprocess.PIPE)
1350        output = result.stdout.read()
1351        error = result.stderr.read()
1352        errors = []
1353        rest = RestConnection(self.master)
1354        VBucketAware = VBucketAwareMemcached(rest, bucket.name)
1355        _, _, _ = VBucketAware.request_map(rest, bucket.name)
1356        if error:
1357            self.log.error(error)
1358            self.fail("Failed to run the loadgen validator.")
1359        if output:
1360            loaded = output.split('\n')[:-1]
1361            for load in loaded:
1362                if "Missing keys:" in load:
1363                    keys = load.split(":")[1].strip().replace('[', '').replace(']', '')
1364                    keys = keys.split(',')
1365                    for key in keys:
1366                        key = key.strip()
1367                        key = key.replace('\'', '').replace('\\', '')
1368                        vBucketId = VBucketAware._get_vBucket_id(key)
1369                        errors.append(
1370                            ("Missing key: {0}, VBucketId: {1}".format(key, vBucketId)))
1371                if "Mismatch keys: " in load:
1372                    keys = load.split(":")[1].strip().replace('[', '').replace(']', '')
1373                    keys = keys.split(',')
1374                    for key in keys:
1375                        key = key.strip()
1376                        key = key.replace('\'', '').replace('\\', '')
1377                        vBucketId = VBucketAware._get_vBucket_id(key)
1378                        errors.append((
1379                                      "Wrong value for key: {0}, VBucketId: {1}".format(
1380                                          key, vBucketId)))
1381        self.log.info("Total number of missing doc:{}".format(len(errors)))
1382        self.log.info("Missing/Mismatch keys:{}".format(errors))
1383        return errors
1384
1385    def test_volume_with_high_ops_reproduce(self):
1386        rest = RestConnection(self.master)
1387        bucket = rest.get_buckets()
1388        start_at = 0
1389        # load initial documents
1390        self.create_ddocs_and_views()
1391        load_thread = []
1392        for bk in bucket:
1393            load_thread.append(Thread(target=self.load_buckets_with_high_ops, args=(
1394            self.master, bk, self.num_items, self.batch_size, self.threads, start_at, self.instances)))
1395        for t in load_thread:
1396            t.start()
1397        stats_dst = rest.get_bucket_stats()
1398        while stats_dst["curr_items"] < 1200000:
1399            self.sleep(300)
1400            stats_dst = rest.get_bucket_stats()
1401        # Rebalance in 1 node
1402        servers_init = self.servers[:self.nodes_init]
1403        # Rebalance in 1 node
1404        self.log.info("==========rebalance in 1 node=========")
1405        servers_in = self.servers[self.nodes_init:self.nodes_init + 1]
1406        rebalance = self.cluster.async_rebalance(servers_init, servers_in, [])
1407        rebalance.result()
1408        for t in load_thread:
1409            t.join()
1410        for b in bucket:
1411            errors=self.check_dataloss_for_high_ops_loader(self.master, b, self.num_items,instances=self.instances)
1412            if len(errors) > 0:
1413                self.fail("data is missing");