1import time, os
2
3from threading import Thread
4import threading
5from basetestcase import BaseTestCase
6from rebalance.rebalance_base import RebalanceBaseTest
7from membase.api.exception import RebalanceFailedException
8from membase.api.rest_client import RestConnection, RestHelper
9from couchbase_helper.documentgenerator import BlobGenerator
10from membase.helper.rebalance_helper import RebalanceHelper
11from remote.remote_util import RemoteMachineShellConnection
12from membase.helper.cluster_helper import ClusterOperationHelper
13from couchbase.bucket import Bucket
14from couchbase.cluster import Cluster, PasswordAuthenticator
15from couchbase.exceptions import NotFoundError, CouchbaseError
16
17from lib.couchbase_helper.tuq_helper import N1QLHelper
18from lib.memcached.helper.data_helper import VBucketAwareMemcached
19from couchbase_helper.document import DesignDocument, View
20
21
22class RebalanceHighOpsWithPillowFight(BaseTestCase):
23    def setUp(self):
24        super(RebalanceHighOpsWithPillowFight, self).setUp()
25        self.rate_limit = self.input.param("rate_limit", 100000)
26        self.batch_size = self.input.param("batch_size", 1000)
27        self.doc_size = self.input.param("doc_size", 100)
28        self.loader = self.input.param("loader", "pillowfight")
29        self.instances = self.input.param("instances", 1)
30        self.recovery_type = self.input.param("recovery_type", None)
31        self.node_out = self.input.param("node_out", 0)
32        self.threads = self.input.param("threads", 5)
33        self.use_replica_to = self.input.param("use_replica_to",False)
34        self.run_with_views = self.input.param("run_with_views", False)
35        self.default_view_name = "upgrade-test-view"
36        self.ddocs_num = self.input.param("ddocs-num", 1)
37        self.view_num = self.input.param("view-per-ddoc", 2)
38        self.is_dev_ddoc = self.input.param("is-dev-ddoc", False)
39        self.ddocs = []
40        self.run_view_query_iterations = self.input.param(
41            "run_view_query_iterations", 30)
42        self.rebalance_quirks = self.input.param('rebalance_quirks', False)
43
44        if self.rebalance_quirks:
45            for server in self.servers:
46                rest = RestConnection(server)
47                rest.diag_eval(
48                    "[ns_config:set({node, N, extra_rebalance_quirks}, [reset_replicas, trivial_moves]) || N <- ns_node_disco:nodes_wanted()].")
49                #rest.diag_eval(
50                #    "[ns_config:set({node, N, disable_rebalance_quirks}, [disable_old_master]) || N <- ns_node_disco:nodes_wanted()].")
51
52    def tearDown(self):
53        super(RebalanceHighOpsWithPillowFight, self).tearDown()
54
55    def load_buckets_with_high_ops(self, server, bucket, items, batch=20000,
56                                   threads=5, start_document=0, instances=1, ttl=0):
57        import subprocess
58        # cmd_format = "python scripts/high_ops_doc_gen.py  --node {0} --bucket {1} --user {2} --password {3} " \
59        #              "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --instances {9} --ttl {10}"
60        cmd_format = "python scripts/thanosied.py  --spec couchbase://{0} --bucket {1} --user {2} --password {3} " \
61                     "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --workers {9} --ttl {10}" \
62                     "--passes 1"
63        cb_version = RestConnection(server).get_nodes_version()[:3]
64        if self.num_replicas > 0 and self.use_replica_to:
65            cmd_format = "{} --replicate_to 1".format(cmd_format)
66        cmd = cmd_format.format(server.ip, bucket.name, server.rest_username,
67                                server.rest_password,
68                                items, batch, threads, start_document,
69                                cb_version, instances, ttl)
70        self.log.info("Running {}".format(cmd))
71        result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
72                                  stderr=subprocess.PIPE)
73        output = result.stdout.read()
74        error = result.stderr.read()
75        if error:
76            self.log.error(error)
77            self.fail("Failed to run the loadgen.")
78        if output:
79            loaded = output.split('\n')[:-1]
80            total_loaded = 0
81            for load in loaded:
82                total_loaded += int(load.split(':')[1].strip())
83            self.assertEqual(total_loaded, items,
84                             "Failed to load {} items. Loaded only {} items".format(
85                                 items,
86                                 total_loaded))
87
88    def update_buckets_with_high_ops(self, server, bucket, items, ops,
89                                     batch=20000, threads=5, start_document=0,
90                                     instances=1):
91        import subprocess
92        # cmd_format = "python scripts/high_ops_doc_gen.py  --node {0} --bucket {1} --user {2} --password {3} " \
93        #              "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --instances {" \
94        #              "9} --ops {10} --updates"
95        cmd_format = "python scripts/thanosied.py  --spec couchbase://{0} --bucket {1} --user {2} --password {3} " \
96                     "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --workers {9} --rate_limit {10} " \
97                     "--passes 1  --update_counter {11}"
98        cb_version = RestConnection(server).get_nodes_version()[:3]
99        if self.num_replicas > 0 and self.use_replica_to:
100            cmd_format = "{} --replicate_to 1".format(cmd_format)
101        cmd = cmd_format.format(server.ip, bucket.name, server.rest_username,
102                                server.rest_password,
103                                items, batch, threads, start_document,
104                                cb_version, instances, ops)
105        self.log.info("Running {}".format(cmd))
106        result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
107                                  stderr=subprocess.PIPE)
108        output = result.stdout.read()
109        error = result.stderr.read()
110        if error:
111            self.log.error(error)
112            self.fail("Failed to run the loadgen.")
113        if output:
114            loaded = output.split('\n')[:-1]
115            total_loaded = 0
116            for load in loaded:
117                total_loaded += int(load.split(':')[1].strip())
118            self.assertEqual(total_loaded, ops,
119                             "Failed to update {} items. Loaded only {} items".format(
120                                 ops,
121                                 total_loaded))
122
123    def delete_buckets_with_high_ops(self, server, bucket, items, ops,
124                                     batch=20000, threads=5,
125                                     start_document=0,
126                                     instances=1):
127        import subprocess
128        # cmd_format = "python scripts/high_ops_doc_gen.py  --node {0} --bucket {1} --user {2} --password {3} " \
129        #              "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --instances {" \
130        #              "9} --ops {10} --delete"
131        cmd_format = "python scripts/thanosied.py  --spec couchbase://{0} --bucket {1} --user {2} --password {3} " \
132                     "--count {4} --batch_size {5} --threads {6} --start_document {7} --cb_version {8} --workers {9} --rate_limit {10} " \
133                     "--passes 1  --delete --num_delete {4}"
134        cb_version = RestConnection(server).get_nodes_version()[:3]
135        if self.num_replicas > 0 and self.use_replica_to:
136            cmd_format = "{} --replicate_to 1".format(cmd_format)
137        cmd = cmd_format.format(server.ip, bucket.name, server.rest_username,
138                                server.rest_password,
139                                items, batch, threads, start_document,
140                                cb_version, instances, ops)
141        self.log.info("Running {}".format(cmd))
142        result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
143                                  stderr=subprocess.PIPE)
144        output = result.stdout.read()
145        error = result.stderr.read()
146        if error:
147            self.log.error(error)
148            self.fail("Failed to run the loadgen.")
149        if output:
150            loaded = output.split('\n')[:-1]
151            total_loaded = 0
152            for load in loaded:
153                total_loaded += int(load.split(':')[1].strip())
154            self.assertEqual(total_loaded, ops,
155                             "Failed to update {} items. Loaded only {} items".format(
156                                 ops,
157                                 total_loaded))
158
159    def load(self, server, items, batch=1000, docsize=100, rate_limit=100000,
160             start_at=0):
161        import subprocess
162        from lib.testconstants import COUCHBASE_FROM_SPOCK
163        rest = RestConnection(server)
164        import multiprocessing
165
166        num_threads = multiprocessing.cpu_count() / 2
167        num_cycles = int(items / batch * 1.5 / num_threads)
168
169        cmd = "cbc-pillowfight -U couchbase://{0}/default -I {1} -m {3} -M {3} -B {2} -c {5} --sequential --json -t {4} --rate-limit={6} --start-at={7}" \
170            .format(server.ip, items, batch, docsize, num_threads, num_cycles,
171                    rate_limit, start_at)
172
173        if self.num_replicas > 0 and self.use_replica_to:
174            cmd += " --replicate-to=1"
175        if rest.get_nodes_version()[:5] in COUCHBASE_FROM_SPOCK:
176            cmd += " -u Administrator -P password"
177        self.log.info("Executing '{0}'...".format(cmd))
178        rc = subprocess.call(cmd, shell=True)
179        if rc != 0:
180            self.fail(
181                "Exception running cbc-pillowfight: subprocess module returned non-zero response!")
182
183    def load_docs(self, num_items=0, start_document=0,ttl=0):
184        if num_items == 0:
185            num_items = self.num_items
186        if self.loader == "pillowfight":
187            load_thread = Thread(target=self.load,
188                                 name="pillowfight_load",
189                                 args=(
190                                     self.master, num_items, self.batch_size,
191                                     self.doc_size, self.rate_limit,
192                                     start_document))
193            return load_thread
194        elif self.loader == "high_ops":
195            if num_items == 0:
196                num_items = self.num_items
197            load_thread = Thread(target=self.load_buckets_with_high_ops,
198                                 name="high_ops_load",
199                                 args=(self.master, self.buckets[0], num_items,
200                                       self.batch_size,
201                                       self.threads, start_document,
202                                       self.instances, ttl))
203            return load_thread
204
205    def check_dataloss_for_high_ops_loader(self, server, bucket, items,
206                                           batch=20000, threads=5,
207                                           start_document=0,
208                                           updated=False, ops=0, ttl=0, deleted=False, deleted_items=0):
209        import subprocess
210        from lib.memcached.helper.data_helper import VBucketAwareMemcached
211
212        cmd_format = "python scripts/high_ops_doc_gen.py  --node {0} --bucket {1} --user {2} --password {3} " \
213                     "--count {4} " \
214                     "--batch_size {5} --threads {6} --start_document {7} --cb_version {8} --validate"
215        cb_version = RestConnection(server).get_nodes_version()[:3]
216        if updated:
217            cmd_format = "{} --updated --ops {}".format(cmd_format, ops)
218        if deleted:
219            cmd_format = "{} --deleted --deleted_items {}".format(cmd_format, deleted_items)
220        if ttl > 0:
221            cmd_format = "{} --ttl {}".format(cmd_format, ttl)
222        cmd = cmd_format.format(server.ip, bucket.name, server.rest_username,
223                                server.rest_password,
224                                int(items), batch, threads, start_document, cb_version)
225        self.log.info("Running {}".format(cmd))
226        result = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
227                                  stderr=subprocess.PIPE)
228        output = result.stdout.read()
229        error = result.stderr.read()
230        errors = []
231        rest = RestConnection(self.master)
232        VBucketAware = VBucketAwareMemcached(rest, bucket.name)
233        _, _, _ = VBucketAware.request_map(rest, bucket.name)
234        if error:
235            self.log.error(error)
236            self.fail("Failed to run the loadgen validator.")
237        if output:
238            loaded = output.split('\n')[:-1]
239            for load in loaded:
240                if "Missing keys:" in load:
241                    keys = load.split(":")[1].strip().replace('[', '').replace(']', '')
242                    keys = keys.split(',')
243                    for key in keys:
244                        key = key.strip()
245                        key = key.replace('\'', '').replace('\\', '')
246                        vBucketId = VBucketAware._get_vBucket_id(key)
247                        errors.append(
248                            ("Missing key: {0}, VBucketId: {1}".format(key, vBucketId)))
249                if "Mismatch keys: " in load:
250                    keys = load.split(":")[1].strip().replace('[', '').replace(']', '')
251                    keys = keys.split(',')
252                    for key in keys:
253                        key = key.strip()
254                        key = key.replace('\'', '').replace('\\', '')
255                        vBucketId = VBucketAware._get_vBucket_id(key)
256                        errors.append((
257                                      "Wrong value for key: {0}, VBucketId: {1}".format(
258                                          key, vBucketId)))
259        return errors
260
261    def check_dataloss(self, server, bucket, num_items):
262        if RestConnection(server).get_nodes_version()[:5] < '5':
263            bkt = Bucket('couchbase://{0}/{1}'.format(server.ip, bucket.name))
264        else:
265            cluster = Cluster("couchbase://{}".format(server.ip))
266            auth = PasswordAuthenticator(server.rest_username,
267                                         server.rest_password)
268            cluster.authenticate(auth)
269            bkt = cluster.open_bucket(bucket.name)
270
271        rest = RestConnection(self.master)
272        VBucketAware = VBucketAwareMemcached(rest, bucket.name)
273        _, _, _ = VBucketAware.request_map(rest, bucket.name)
274        batch_start = 0
275        batch_end = 0
276        batch_size = 10000
277        errors = []
278        while num_items > batch_end:
279            batch_end = batch_start + batch_size
280            keys = []
281            for i in xrange(batch_start, batch_end, 1):
282                keys.append(str(i).rjust(20, '0'))
283            try:
284                bkt.get_multi(keys)
285                self.log.info(
286                    "Able to fetch keys starting from {0} to {1}".format(
287                        keys[0], keys[len(keys) - 1]))
288            except Exception as e:
289                self.log.error(e)
290                self.log.info("Now trying keys in the batch one at a time...")
291                key = ''
292                try:
293                    for key in keys:
294                        bkt.get(key)
295                except NotFoundError:
296                    vBucketId = VBucketAware._get_vBucket_id(key)
297                    errors.append("Missing key: {0}, VBucketId: {1}".
298                                  format(key, vBucketId))
299            batch_start += batch_size
300        return errors
301
302    def check_data(self, server, bucket, num_items=0, start_document=0,
303                   updated=False, ops=0, batch_size=0, ttl=0, deleted=False,
304                   deleted_items=0):
305        if batch_size == 0:
306            batch_size = self.batch_size
307        if self.loader == "pillowfight":
308            return self.check_dataloss(server, bucket, num_items)
309        elif self.loader == "high_ops":
310            return self.check_dataloss_for_high_ops_loader(server, bucket,
311                                                           num_items,
312                                                           self.batch_size,
313                                                           self.threads,
314                                                           start_document,
315                                                           updated, ops, ttl,
316                                                           deleted, deleted_items)
317
318    def test_rebalance_in(self):
319        rest = RestConnection(self.master)
320        bucket = rest.get_buckets()[0]
321
322        #servers_in = []
323        #for i in range(0,self.nodes_in):
324        #    servers_in.append(self.servers[self.nodes_init+i])
325
326        #self.log.info("Servers In : {0}".format(servers_in))
327
328        load_thread = self.load_docs()
329        if self.run_with_views:
330            self.log.info('creating ddocs and views')
331            self.create_ddocs_and_views()
332            self.log.info('starting the view query thread...')
333            view_query_thread = self.run_view_queries()
334            view_query_thread.start()
335        self.log.info('starting the load thread...')
336        load_thread.start()
337        load_thread.join()
338        load_thread = self.load_docs(num_items=(self.num_items * 2),
339                                     start_document=self.num_items)
340        load_thread.start()
341        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
342                                                 self.servers[
343                                                 self.nodes_init:self.nodes_init + self.nodes_in],
344                                                 [])
345        #rebalance.result()
346        rest.monitorRebalance(stop_if_loop=False)
347        load_thread.join()
348        if self.run_with_views:
349            view_query_thread.join()
350        num_items_to_validate = self.num_items * 3
351        errors = self.check_data(self.master, bucket, num_items_to_validate)
352        if errors:
353            self.log.info("Printing missing keys:")
354        for error in errors:
355            print error
356        if num_items_to_validate != rest.get_active_key_count(bucket):
357            self.fail(
358                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
359                    format(num_items_to_validate,
360                           rest.get_active_key_count(bucket)))
361        if self.num_replicas > 0:
362            self.assertEqual(num_items_to_validate,
363                             (rest.get_replica_key_count(
364                                 bucket) / self.num_replicas),
365                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
366                                 num_items_to_validate, (
367                                 rest.get_replica_key_count(
368                                     bucket) / self.num_replicas)))
369
370    def test_rebalance_in_with_update_workload(self):
371        rest = RestConnection(self.master)
372        bucket = rest.get_buckets()[0]
373        load_thread = self.load_docs()
374        if self.run_with_views:
375            self.log.info('creating ddocs and views')
376            self.create_ddocs_and_views()
377            self.log.info('starting the view query thread...')
378            view_query_thread = self.run_view_queries()
379            view_query_thread.start()
380        self.log.info('starting the load thread...')
381        load_thread.start()
382        load_thread.join()
383
384        update_thread = Thread(target=self.update_buckets_with_high_ops,
385                             name="update_high_ops_load",
386                             args=(self.master, self.buckets[0], self.num_items,
387                                   self.num_items * 2, self.batch_size,
388                                   self.threads, 0, self.instances))
389
390        update_thread.start()
391        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
392                                                 self.servers[
393                                                 self.nodes_init:self.nodes_init + self.nodes_in],
394                                                 [])
395        # rebalance.result()
396        rest.monitorRebalance(stop_if_loop=False)
397        update_thread.join()
398        if self.run_with_views:
399            view_query_thread.join()
400        num_items_to_validate = self.num_items
401        errors = self.check_data(self.master, bucket, num_items_to_validate, 0, True, self.num_items * 2)
402        if errors:
403            self.log.info("Printing missing keys:")
404        for error in errors:
405            print error
406        if num_items_to_validate != rest.get_active_key_count(bucket):
407            self.fail(
408                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
409                    format(num_items_to_validate,
410                           rest.get_active_key_count(bucket)))
411        else:
412            if errors:
413                self.fail("FATAL : Few mutations missed. See above for details.")
414
415        if self.num_replicas > 0:
416            self.assertEqual(num_items_to_validate,
417                             (rest.get_replica_key_count(
418                                 bucket) / self.num_replicas),
419                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
420                                 num_items_to_validate, (
421                                     rest.get_replica_key_count(
422                                         bucket) / self.num_replicas)))
423
424    def test_rebalance_in_with_delete_workload(self):
425        rest = RestConnection(self.master)
426        bucket = rest.get_buckets()[0]
427        load_thread = self.load_docs()
428        if self.run_with_views:
429            self.log.info('creating ddocs and views')
430            self.create_ddocs_and_views()
431            self.log.info('starting the view query thread...')
432            view_query_thread = self.run_view_queries()
433            view_query_thread.start()
434        self.log.info('starting the load thread...')
435        load_thread.start()
436        load_thread.join()
437
438        delete_thread = Thread(target=self.delete_buckets_with_high_ops,
439                               name="delete_high_ops_load",
440                               args=(
441                               self.master, self.buckets[0], self.num_items,
442                               self.num_items, self.batch_size,
443                               self.threads, 0, self.instances))
444
445        delete_thread.start()
446        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
447                                                 self.servers[
448                                                 self.nodes_init:self.nodes_init + self.nodes_in],
449                                                 [])
450        # rebalance.result()
451        rest.monitorRebalance(stop_if_loop=False)
452        delete_thread.join()
453        if self.run_with_views:
454            view_query_thread.join()
455        num_items_to_validate = self.num_items
456        errors = self.check_data(self.master, bucket, num_items_to_validate, 0,
457                                 deleted=True, deleted_items=num_items_to_validate)
458        if errors:
459            self.log.info("Printing missing keys:")
460        for error in errors:
461            print error
462        if rest.get_active_key_count(bucket) != 0:
463            self.fail(
464                "FATAL: Data loss detected!! Docs Deleted : {0}, docs present: {1}".
465                    format(num_items_to_validate,
466                           rest.get_active_key_count(bucket)))
467        else:
468            if errors:
469                self.fail(
470                    "FATAL : Few mutations missed. See above for details.")
471
472        if self.num_replicas > 0:
473            self.assertEqual(0,
474                             (rest.get_replica_key_count(
475                                 bucket) / self.num_replicas),
476                             "Not all keys deleted from replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
477                                 num_items_to_validate, (
478                                     rest.get_replica_key_count(
479                                         bucket) / self.num_replicas)))
480
481    def test_rebalance_in_with_expiry(self):
482        rest = RestConnection(self.master)
483        bucket = rest.get_buckets()[0]
484        load_thread = self.load_docs(ttl=10)
485        if self.run_with_views:
486            self.log.info('creating ddocs and views')
487            self.create_ddocs_and_views()
488            self.log.info('starting the view query thread...')
489            view_query_thread = self.run_view_queries()
490            view_query_thread.start()
491        self.log.info('starting the load thread...')
492        load_thread.start()
493        load_thread.join()
494
495        # Allow docs to expire
496        self.sleep(15)
497
498        validate_thread = Thread(target=self.check_data,
499                               name="update_high_ops_load",
500                               args=(self.master, bucket, self.num_items, 0, False, 0, 100))
501
502        validate_thread.start()
503        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
504                                                 self.servers[
505                                                 self.nodes_init:self.nodes_init + self.nodes_in],
506                                                 [])
507        # rebalance.result()
508        rest.monitorRebalance(stop_if_loop=False)
509        validate_thread.join()
510
511        if self.run_with_views:
512            view_query_thread.join()
513
514        num_items_to_validate = self.num_items
515        errors = self.check_data(self.master, bucket, num_items_to_validate, ttl=10)
516        if errors:
517            self.log.info("Printing missing keys:")
518        for error in errors:
519            print error
520        if rest.get_active_key_count(bucket) != 0:
521            self.fail(
522                "FATAL: Data loss detected!! Docs expired : {0}, docs present: {1}".
523                    format(num_items_to_validate,
524                           rest.get_active_key_count(bucket)))
525        else:
526            if errors:
527                self.fail(
528                    "FATAL : Few mutations missed. See above for details.")
529
530        if self.num_replicas > 0:
531            self.assertEqual(0,
532                             (rest.get_replica_key_count(
533                                 bucket) / self.num_replicas),
534                             "Not all keys deleted from replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
535                                 0, (rest.get_replica_key_count(bucket) / self.num_replicas)))
536
537    def test_rebalance_out(self):
538        servs_out = [self.servers[self.nodes_init - i - 1] for i in
539                     range(self.nodes_out)]
540
541        #servs_out = []
542        #for i in range(0, self.nodes_out):
543        #    servs_out.append(self.servers[self.nodes_init -1 - i])
544
545        #self.log.info("Servers In : {0}".format(servs_out))
546
547        self.log.info("Servers Out: {0}".format(servs_out))
548        rest = RestConnection(self.master)
549        bucket = rest.get_buckets()[0]
550        load_thread = self.load_docs()
551
552        if self.run_with_views:
553            self.log.info('creating ddocs and views')
554            self.create_ddocs_and_views()
555            self.log.info('starting the view query thread...')
556            view_query_thread = self.run_view_queries()
557            view_query_thread.start()
558
559        self.log.info('starting the load thread...')
560        load_thread.start()
561        load_thread.join()
562        load_thread = self.load_docs(num_items=(self.num_items * 2),
563                                     start_document=self.num_items)
564        load_thread.start()
565        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
566                                                 [], servs_out)
567        # rebalance.result()
568        rest.monitorRebalance(stop_if_loop=False)
569        load_thread.join()
570
571        if self.run_with_views:
572            view_query_thread.join()
573
574        num_items_to_validate = self.num_items * 3
575        errors = self.check_data(self.master, bucket, num_items_to_validate)
576        if errors:
577            self.log.info("Printing missing keys:")
578        for error in errors:
579            print error
580        if num_items_to_validate != rest.get_active_key_count(bucket):
581            self.fail(
582                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
583                    format(num_items_to_validate,
584                           rest.get_active_key_count(bucket)))
585        if self.num_replicas > 0:
586            self.assertEqual(num_items_to_validate,
587                             (rest.get_replica_key_count(
588                                 bucket) / self.num_replicas),
589                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
590                                 num_items_to_validate, (
591                                 rest.get_replica_key_count(
592                                     bucket) / self.num_replicas)))
593
594    def test_rebalance_out_with_update_workload(self):
595        servs_out = [self.servers[self.nodes_init - i - 1] for i in
596                     range(self.nodes_out)]
597        rest = RestConnection(self.master)
598        bucket = rest.get_buckets()[0]
599        load_thread = self.load_docs()
600        if self.run_with_views:
601            self.log.info('creating ddocs and views')
602            self.create_ddocs_and_views()
603            self.log.info('starting the view query thread...')
604            view_query_thread = self.run_view_queries()
605            view_query_thread.start()
606
607        self.log.info('starting the load thread...')
608        load_thread.start()
609        load_thread.join()
610
611        update_thread = Thread(target=self.update_buckets_with_high_ops,
612                               name="update_high_ops_load",
613                               args=(
614                               self.master, self.buckets[0], self.num_items,
615                               self.num_items * 2, self.batch_size,
616                               self.threads, 0, self.instances))
617
618        update_thread.start()
619        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
620                                                 [], servs_out)
621        # rebalance.result()
622        rest.monitorRebalance(stop_if_loop=False)
623        update_thread.join()
624        if self.run_with_views:
625            view_query_thread.join()
626        num_items_to_validate = self.num_items
627        errors = self.check_data(self.master, bucket, num_items_to_validate, 0,
628                                 True, self.num_items * 2)
629        if errors:
630            self.log.info("Printing missing keys:")
631        for error in errors:
632            print error
633        if num_items_to_validate != rest.get_active_key_count(bucket):
634            self.fail(
635                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
636                    format(num_items_to_validate,
637                           rest.get_active_key_count(bucket)))
638        else:
639            if errors:
640                self.fail(
641                    "FATAL : Few mutations missed. See above for details.")
642
643        if self.num_replicas > 0:
644            self.assertEqual(num_items_to_validate,
645                             (rest.get_replica_key_count(
646                                 bucket) / self.num_replicas),
647                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
648                                 num_items_to_validate, (
649                                     rest.get_replica_key_count(
650                                         bucket) / self.num_replicas)))
651
652    def test_rebalance_out_with_delete_workload(self):
653        servs_out = [self.servers[self.nodes_init - i - 1] for i in
654                     range(self.nodes_out)]
655        rest = RestConnection(self.master)
656        bucket = rest.get_buckets()[0]
657        load_thread = self.load_docs()
658        if self.run_with_views:
659            self.log.info('creating ddocs and views')
660            self.create_ddocs_and_views()
661            self.log.info('starting the view query thread...')
662            view_query_thread = self.run_view_queries()
663            view_query_thread.start()
664
665        self.log.info('starting the load thread...')
666        load_thread.start()
667        load_thread.join()
668
669        delete_thread = Thread(target=self.delete_buckets_with_high_ops,
670                               name="delete_high_ops_load",
671                               args=(
672                                   self.master, self.buckets[0], self.num_items,
673                                   self.num_items, self.batch_size,
674                                   self.threads, 0, self.instances))
675
676        delete_thread.start()
677        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
678                                                 [], servs_out)
679        # rebalance.result()
680        rest.monitorRebalance(stop_if_loop=False)
681        delete_thread.join()
682        if self.run_with_views:
683            view_query_thread.join()
684        num_items_to_validate = self.num_items
685        errors = self.check_data(self.master, bucket, num_items_to_validate, 0,
686                                 deleted=True,
687                                 deleted_items=num_items_to_validate)
688
689        if errors:
690            self.log.info("Printing missing keys:")
691        for error in errors:
692            print error
693        if rest.get_active_key_count(bucket) != 0:
694            self.fail(
695                "FATAL: Data loss detected!! Docs Deleted : {0}, docs present: {1}".
696                    format(num_items_to_validate,
697                           rest.get_active_key_count(bucket)))
698        else:
699            if errors:
700                self.fail(
701                    "FATAL : Few mutations missed. See above for details.")
702
703        if self.num_replicas > 0:
704            self.assertEqual(0,
705                             (rest.get_replica_key_count(
706                                 bucket) / self.num_replicas),
707                             "Not all keys deleted from replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
708                                 num_items_to_validate, (
709                                     rest.get_replica_key_count(
710                                         bucket) / self.num_replicas)))
711
712    def test_rebalance_out_with_expiry(self):
713        servs_out = [self.servers[self.nodes_init - i - 1] for i in
714                     range(self.nodes_out)]
715        rest = RestConnection(self.master)
716        bucket = rest.get_buckets()[0]
717        load_thread = self.load_docs(ttl=10)
718        if self.run_with_views:
719            self.log.info('creating ddocs and views')
720            self.create_ddocs_and_views()
721            self.log.info('starting the view query thread...')
722            view_query_thread = self.run_view_queries()
723            view_query_thread.start()
724        self.log.info('starting the load thread...')
725        load_thread.start()
726        load_thread.join()
727
728        # Allow docs to expire
729        self.sleep(15)
730
731        validate_thread = Thread(target=self.check_data,
732                                 name="update_high_ops_load",
733                                 args=(
734                                 self.master, bucket, self.num_items, 0, False,
735                                 0, 100))
736
737        validate_thread.start()
738        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
739                                                 [], servs_out)
740        # rebalance.result()
741        rest.monitorRebalance(stop_if_loop=False)
742        validate_thread.join()
743
744        if self.run_with_views:
745            view_query_thread.join()
746
747        num_items_to_validate = self.num_items
748        errors = self.check_data(self.master, bucket, num_items_to_validate,
749                                 ttl=10)
750        if errors:
751            self.log.info("Printing missing keys:")
752        for error in errors:
753            print error
754        if rest.get_active_key_count(bucket) != 0:
755            self.fail(
756                "FATAL: Data loss detected!! Docs expired : {0}, docs present: {1}".
757                    format(num_items_to_validate,
758                           rest.get_active_key_count(bucket)))
759        else:
760            if errors:
761                self.fail(
762                    "FATAL : Few mutations missed. See above for details.")
763
764        if self.num_replicas > 0:
765            self.assertEqual(0,
766                             (rest.get_replica_key_count(
767                                 bucket) / self.num_replicas),
768                             "Not all keys deleted from replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
769                                 0, (rest.get_replica_key_count(
770                                     bucket) / self.num_replicas)))
771
772    def test_rebalance_in_out(self):
773        servs_out = [self.servers[self.nodes_init - i - 1] for i in
774                     range(self.nodes_out)]
775        self.log.info("Servers Out: {0}".format(servs_out))
776        rest = RestConnection(self.master)
777        bucket = rest.get_buckets()[0]
778        load_thread = self.load_docs()
779
780        if self.run_with_views:
781            self.log.info('creating ddocs and views')
782            self.create_ddocs_and_views()
783            self.log.info('starting the view query thread...')
784            view_query_thread = self.run_view_queries()
785            view_query_thread.start()
786
787        self.log.info('starting the load thread...')
788        load_thread.start()
789        load_thread.join()
790        load_thread = self.load_docs(num_items=(self.num_items * 2),
791                                     start_document=self.num_items)
792        load_thread.start()
793        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
794                                                 self.servers[
795                                                 self.nodes_init:self.nodes_init + self.nodes_in],
796                                                 servs_out)
797        # rebalance.result()
798        rest.monitorRebalance(stop_if_loop=False)
799        load_thread.join()
800
801        if self.run_with_views:
802            view_query_thread.join()
803
804        num_items_to_validate = self.num_items * 3
805        errors = self.check_data(self.master, bucket, num_items_to_validate)
806        if errors:
807            self.log.info("Printing missing keys:")
808        for error in errors:
809            print error
810        if num_items_to_validate != rest.get_active_key_count(bucket):
811            self.fail(
812                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
813                    format(num_items_to_validate,
814                           rest.get_active_key_count(bucket)))
815        if self.num_replicas > 0:
816            self.assertEqual(num_items_to_validate,
817                             (rest.get_replica_key_count(
818                                 bucket) / self.num_replicas),
819                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
820                                 num_items_to_validate, (
821                                 rest.get_replica_key_count(
822                                     bucket) / self.num_replicas)))
823
824    def test_rebalance_in_out_with_update_workload(self):
825        servs_out = [self.servers[self.nodes_init - i - 1] for i in
826                     range(self.nodes_out)]
827        rest = RestConnection(self.master)
828        bucket = rest.get_buckets()[0]
829        load_thread = self.load_docs()
830        if self.run_with_views:
831            self.log.info('creating ddocs and views')
832            self.create_ddocs_and_views()
833            self.log.info('starting the view query thread...')
834            view_query_thread = self.run_view_queries()
835            view_query_thread.start()
836        self.log.info('starting the load thread...')
837        load_thread.start()
838        load_thread.join()
839
840        update_thread = Thread(target=self.update_buckets_with_high_ops,
841                               name="update_high_ops_load",
842                               args=(
843                                   self.master, self.buckets[0], self.num_items,
844                                   self.num_items * 2, self.batch_size,
845                                   self.threads, 0, self.instances))
846
847        update_thread.start()
848        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
849                                                 self.servers[
850                                                 self.nodes_init:self.nodes_init + self.nodes_in],
851                                                 servs_out)
852        # rebalance.result()
853        rest.monitorRebalance(stop_if_loop=False)
854        update_thread.join()
855
856        if self.run_with_views:
857            view_query_thread.join()
858
859        num_items_to_validate = self.num_items
860        errors = self.check_data(self.master, bucket, num_items_to_validate, 0,
861                                 True, self.num_items * 2)
862        if errors:
863            self.log.info("Printing missing keys:")
864        for error in errors:
865            print error
866        if num_items_to_validate != rest.get_active_key_count(bucket):
867            self.fail(
868                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
869                    format(num_items_to_validate,
870                           rest.get_active_key_count(bucket)))
871        else:
872            if errors:
873                self.fail(
874                    "FATAL : Few mutations missed. See above for details.")
875
876        if self.num_replicas > 0:
877            self.assertEqual(num_items_to_validate,
878                             (rest.get_replica_key_count(
879                                 bucket) / self.num_replicas),
880                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
881                                 num_items_to_validate, (
882                                     rest.get_replica_key_count(
883                                         bucket) / self.num_replicas)))
884
885    def test_rebalance_in_out_with_delete_workload(self):
886        servs_out = [self.servers[self.nodes_init - i - 1] for i in
887                     range(self.nodes_out)]
888        rest = RestConnection(self.master)
889        bucket = rest.get_buckets()[0]
890        load_thread = self.load_docs()
891        if self.run_with_views:
892            self.log.info('creating ddocs and views')
893            self.create_ddocs_and_views()
894            self.log.info('starting the view query thread...')
895            view_query_thread = self.run_view_queries()
896            view_query_thread.start()
897        self.log.info('starting the load thread...')
898        load_thread.start()
899        load_thread.join()
900
901        delete_thread = Thread(target=self.delete_buckets_with_high_ops,
902                               name="delete_high_ops_load",
903                               args=(
904                                   self.master, self.buckets[0], self.num_items,
905                                   self.num_items, self.batch_size,
906                                   self.threads, 0, self.instances))
907
908        delete_thread.start()
909        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
910                                                 self.servers[
911                                                 self.nodes_init:self.nodes_init + self.nodes_in],
912                                                 servs_out)
913        # rebalance.result()
914        rest.monitorRebalance(stop_if_loop=False)
915        delete_thread.join()
916
917        if self.run_with_views:
918            view_query_thread.join()
919
920        num_items_to_validate = self.num_items
921        errors = self.check_data(self.master, bucket, num_items_to_validate, 0,
922                                 deleted=True,
923                                 deleted_items=num_items_to_validate)
924
925        if errors:
926            self.log.info("Printing missing keys:")
927        for error in errors:
928            print error
929        if rest.get_active_key_count(bucket) != 0:
930            self.fail(
931                "FATAL: Data loss detected!! Docs Deleted : {0}, docs present: {1}".
932                    format(num_items_to_validate,
933                           rest.get_active_key_count(bucket)))
934        else:
935            if errors:
936                self.fail(
937                    "FATAL : Few mutations missed. See above for details.")
938
939        if self.num_replicas > 0:
940            self.assertEqual(0,
941                             (rest.get_replica_key_count(
942                                 bucket) / self.num_replicas),
943                             "Not all keys deleted from replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
944                                 num_items_to_validate, (
945                                     rest.get_replica_key_count(
946                                         bucket) / self.num_replicas)))
947
948
949    def test_rebalance_in_out_with_expiry(self):
950        servs_out = [self.servers[self.nodes_init - i - 1] for i in
951                     range(self.nodes_out)]
952        rest = RestConnection(self.master)
953        bucket = rest.get_buckets()[0]
954        load_thread = self.load_docs(ttl=10)
955        if self.run_with_views:
956            self.log.info('creating ddocs and views')
957            self.create_ddocs_and_views()
958            self.log.info('starting the view query thread...')
959            view_query_thread = self.run_view_queries()
960            view_query_thread.start()
961        self.log.info('starting the load thread...')
962        load_thread.start()
963        load_thread.join()
964
965        # Allow docs to expire
966        self.sleep(15)
967
968        validate_thread = Thread(target=self.check_data,
969                                 name="update_high_ops_load",
970                                 args=(
971                                     self.master, bucket, self.num_items, 0,
972                                     False,
973                                     0, 100))
974
975        validate_thread.start()
976        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
977                                                 self.servers[
978                                                 self.nodes_init:self.nodes_init + self.nodes_in],
979                                                 servs_out)
980        # rebalance.result()
981        rest.monitorRebalance(stop_if_loop=False)
982        validate_thread.join()
983
984        if self.run_with_views:
985            view_query_thread.join()
986
987        num_items_to_validate = self.num_items
988        errors = self.check_data(self.master, bucket, num_items_to_validate,
989                                 ttl=10)
990        if errors:
991            self.log.info("Printing missing keys:")
992        for error in errors:
993            print error
994        if rest.get_active_key_count(bucket) != 0:
995            self.fail(
996                "FATAL: Data loss detected!! Docs expired : {0}, docs present: {1}".
997                    format(num_items_to_validate,
998                           rest.get_active_key_count(bucket)))
999        else:
1000            if errors:
1001                self.fail(
1002                    "FATAL : Few mutations missed. See above for details.")
1003
1004        if self.num_replicas > 0:
1005            self.assertEqual(0,
1006                             (rest.get_replica_key_count(
1007                                 bucket) / self.num_replicas),
1008                             "Not all keys deleted from replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
1009                                 0, (rest.get_replica_key_count(
1010                                     bucket) / self.num_replicas)))
1011
1012    def test_graceful_failover_addback(self):
1013        node_out = self.servers[self.node_out]
1014        rest = RestConnection(self.master)
1015        bucket = rest.get_buckets()[0]
1016        load_thread = self.load_docs()
1017        self.log.info('starting the load thread...')
1018        load_thread.start()
1019        load_thread.join()
1020        load_thread = self.load_docs(num_items=(self.num_items * 2),
1021                                     start_document=self.num_items)
1022        load_thread.start()
1023        nodes_all = rest.node_statuses()
1024        for node in nodes_all:
1025            if node.ip == node_out.ip:
1026                break
1027
1028        failover_task = self.cluster.async_failover(
1029            self.servers[:self.nodes_init],
1030            [node_out],
1031            "graceful", wait_for_pending=360)
1032
1033        failover_task.result()
1034
1035        rest.set_recovery_type(node.id, self.recovery_type)
1036        rest.add_back_node(node.id)
1037
1038        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
1039                                                 [], [])
1040
1041        reached = RestHelper(rest).rebalance_reached()
1042        self.assertTrue(reached, "rebalance failed, stuck or did not complete")
1043        load_thread.join()
1044        num_items_to_validate = self.num_items * 3
1045        errors = self.check_data(self.master, bucket, num_items_to_validate)
1046        if errors:
1047            self.log.info("Printing missing keys:")
1048        for error in errors:
1049            print error
1050        if num_items_to_validate != rest.get_active_key_count(bucket):
1051            self.fail(
1052                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
1053                    format(num_items_to_validate,
1054                           rest.get_active_key_count(bucket)))
1055        if self.num_replicas > 0:
1056            self.assertEqual(num_items_to_validate,
1057                             (rest.get_replica_key_count(
1058                                 bucket) / self.num_replicas),
1059                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
1060                                 num_items_to_validate, (
1061                                 rest.get_replica_key_count(
1062                                     bucket) / self.num_replicas)))
1063
1064    def test_multiple_rebalance_in_out(self):
1065        servs_out = [self.servers[self.nodes_init - i - 1] for i in
1066                     range(self.nodes_out)]
1067        self.log.info("Servers Out: {0}".format(servs_out))
1068        rest = RestConnection(self.master)
1069        bucket = rest.get_buckets()[0]
1070        load_thread = self.load_docs()
1071        self.log.info('starting the initial load...')
1072        load_thread.start()
1073        load_thread.join()
1074
1075        self.log.info('starting the load before rebalance in...')
1076        load_thread = self.load_docs(num_items=(self.num_items * 2),
1077                                     start_document=self.num_items)
1078        load_thread.start()
1079
1080        # Add 1 node
1081        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
1082                                                 self.servers[
1083                                                 self.nodes_init:self.nodes_init + self.nodes_in],
1084                                                 [])
1085        # rebalance.result()
1086        rest.monitorRebalance(stop_if_loop=False)
1087        load_thread.join()
1088        num_items_to_validate = self.num_items * 3
1089        errors = self.check_data(self.master, bucket, num_items_to_validate)
1090        if errors:
1091            self.log.info("Printing missing keys:")
1092        for error in errors:
1093            print error
1094        if num_items_to_validate != rest.get_active_key_count(bucket):
1095            self.fail(
1096                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
1097                    format(num_items_to_validate,
1098                           rest.get_active_key_count(bucket)))
1099        if self.num_replicas > 0:
1100            self.assertEqual(num_items_to_validate,
1101                             (rest.get_replica_key_count(
1102                                 bucket) / self.num_replicas),
1103                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
1104                                 num_items_to_validate, (
1105                                 rest.get_replica_key_count(
1106                                     bucket) / self.num_replicas)))
1107
1108        self.log.info('starting the load before rebalance out...')
1109        load_thread = self.load_docs(num_items=(self.num_items * 2),
1110                                         start_document=self.num_items*3)
1111
1112        load_thread.start()
1113        # Remove 1 node
1114        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
1115                                                 [], servs_out)
1116        # rebalance.result()
1117        rest.monitorRebalance(stop_if_loop=False)
1118        load_thread.join()
1119        num_items_to_validate = self.num_items * 5
1120        errors = self.check_data(self.master, bucket, num_items_to_validate)
1121        if errors:
1122            self.log.info("Printing missing keys:")
1123        for error in errors:
1124            print error
1125        if num_items_to_validate != rest.get_active_key_count(bucket):
1126            self.fail(
1127                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
1128                    format(num_items_to_validate,
1129                           rest.get_active_key_count(bucket)))
1130        if self.num_replicas > 0:
1131            self.assertEqual(num_items_to_validate,
1132                             (rest.get_replica_key_count(
1133                                 bucket) / self.num_replicas),
1134                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
1135                                 num_items_to_validate, (
1136                                 rest.get_replica_key_count(
1137                                     bucket) / self.num_replicas)))
1138
1139        self.log.info('starting the load before swap rebalance...')
1140        load_thread = self.load_docs(num_items=(self.num_items * 2),
1141                                         start_document=self.num_items * 5)
1142
1143        load_thread.start()
1144        # Swap rebalance 1 node
1145        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
1146                                                 self.servers[
1147                                                 self.nodes_init:self.nodes_init + self.nodes_in],
1148                                                 servs_out)
1149        rebalance.result()
1150        load_thread.join()
1151        num_items_to_validate = self.num_items * 7
1152        errors = self.check_data(self.master, bucket, num_items_to_validate)
1153        if errors:
1154            self.log.info("Printing missing keys:")
1155        for error in errors:
1156            print error
1157        if num_items_to_validate != rest.get_active_key_count(bucket):
1158            self.fail(
1159                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
1160                    format(num_items_to_validate,
1161                           rest.get_active_key_count(bucket)))
1162        if self.num_replicas > 0:
1163            self.assertEqual(num_items_to_validate,
1164                             (rest.get_replica_key_count(
1165                                 bucket) / self.num_replicas),
1166                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
1167                                 num_items_to_validate, (
1168                                 rest.get_replica_key_count(
1169                                     bucket) / self.num_replicas)))
1170
1171    def test_start_stop_rebalance_multiple_times(self):
1172        rest = RestConnection(self.master)
1173        bucket = rest.get_buckets()[0]
1174        load_thread = self.load_docs()
1175
1176        self.log.info('starting the load thread...')
1177        load_thread.start()
1178        load_thread.join()
1179        load_thread = self.load_docs(num_items=(self.num_items * 2),
1180                                     start_document=self.num_items)
1181        load_thread.start()
1182        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
1183                                                 self.servers[
1184                                                 self.nodes_init:self.nodes_init + self.nodes_in],
1185                                                 [])
1186        # rebalance.result()
1187        rest.monitorRebalance(stop_if_loop=False)
1188        for i in range(1,100):
1189            self.sleep(20)
1190            stopped = rest.stop_rebalance(wait_timeout=10)
1191            self.assertTrue(stopped, msg="Unable to stop rebalance in iteration {0}".format(i))
1192            self.sleep(10)
1193            rebalance = self.cluster.async_rebalance(
1194                self.servers[:self.nodes_init], [], [])
1195            # rebalance.result()
1196            rest.monitorRebalance(stop_if_loop=False)
1197        load_thread.join()
1198
1199        num_items_to_validate = self.num_items * 3
1200        errors = self.check_data(self.master, bucket, num_items_to_validate)
1201        if errors:
1202            self.log.info("Printing missing keys:")
1203        for error in errors:
1204            print error
1205        if num_items_to_validate != rest.get_active_key_count(bucket):
1206            self.fail(
1207                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
1208                    format(num_items_to_validate,
1209                           rest.get_active_key_count(bucket)))
1210        if self.num_replicas > 0:
1211            self.assertEqual(num_items_to_validate,
1212                             (rest.get_replica_key_count(
1213                                 bucket) / self.num_replicas),
1214                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
1215                                 num_items_to_validate, (
1216                                     rest.get_replica_key_count(
1217                                         bucket) / self.num_replicas)))
1218
1219    def test_rebalance_in_with_indexer_node(self):
1220        rest = RestConnection(self.master)
1221        #rest.add_node(self.servers[self.nodes_init].rest_username,
1222        #                  self.servers[self.nodes_init].rest_password,
1223        #                  self.servers[self.nodes_init].ip, services=['index','n1ql'])
1224
1225        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
1226                                                 [self.servers[self.nodes_init]],
1227                                                 [], ["index","n1ql"])
1228
1229        # rebalance.result()
1230        rest.monitorRebalance(stop_if_loop=False)
1231        bucket = rest.get_buckets()[0]
1232
1233        # Create Secondary index
1234        create_index_statement = "create index idx1 on default(body) using GSI"
1235        self.run_n1ql_query(create_index_statement)
1236
1237        load_thread = self.load_docs()
1238
1239        self.log.info('starting the load thread...')
1240        load_thread.start()
1241        load_thread.join()
1242        load_thread = self.load_docs(num_items=(self.num_items * 2),
1243                                     start_document=self.num_items)
1244        load_thread.start()
1245        rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
1246                                                 self.servers[
1247                                                 (self.nodes_init + 1):(self.nodes_init + self.nodes_in + 1)],
1248                                                 [])
1249        # rebalance.result()
1250        rest.monitorRebalance(stop_if_loop=False)
1251        load_thread.join()
1252
1253        num_items_to_validate = self.num_items * 3
1254        errors = self.check_data(self.master, bucket, num_items_to_validate)
1255        if errors:
1256            self.log.info("Printing missing keys:")
1257        for error in errors:
1258            print error
1259        if num_items_to_validate != rest.get_active_key_count(bucket):
1260            self.fail(
1261                "FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
1262                    format(num_items_to_validate,
1263                           rest.get_active_key_count(bucket)))
1264        if self.num_replicas > 0:
1265            self.assertEqual(num_items_to_validate,
1266                             (rest.get_replica_key_count(
1267                                 bucket) / self.num_replicas),
1268                             "Not all keys present in replica vbuckets. Expected No. of items : {0}, Item count per replica: {1}".format(
1269                                 num_items_to_validate, (
1270                                     rest.get_replica_key_count(
1271                                         bucket) / self.num_replicas)))
1272
1273        # Fetch count of indexed documents
1274        query = "select count(body) from default where body is not missing"
1275        count = self.run_n1ql_query(create_index_statement)
1276        self.assertEqual(num_items_to_validate, count, "Indexed document count not as expected. It is {0}, expected : {1}".format(count,num_items_to_validate))
1277
1278    def run_n1ql_query(self, query):
1279        self.n1ql_node = self.get_nodes_from_services_map(service_type="n1ql")
1280        self.n1ql_helper = N1QLHelper(shell=self.shell,
1281                                      max_verify=self.max_verify,
1282                                      buckets=self.buckets,
1283                                      item_flag=self.item_flag,
1284                                      n1ql_port=self.n1ql_port,
1285                                      full_docs_list=self.full_docs_list,
1286                                      log=self.log, input=self.input,
1287                                      master=self.master,
1288                                      use_rest=True
1289                                      )
1290
1291        return self.n1ql_helper.run_cbq_query(query=query, server=self.n1ql_node)
1292
1293    def run_view_queries(self):
1294        view_query_thread = Thread(target=self.view_queries, name="run_queries",
1295                                   args=(self.run_view_query_iterations,))
1296        return view_query_thread
1297
1298
1299    def view_queries(self, iterations):
1300        query = {"connectionTimeout": 60000}
1301        for count in xrange(iterations):
1302            for i in xrange(self.view_num):
1303                self.cluster.query_view(self.master, self.ddocs[0].name,
1304                                        self.default_view_name + str(i), query,
1305                                        expected_rows=None, bucket="default",
1306                                        retry_time=2)
1307
1308
1309    def create_ddocs_and_views(self):
1310        self.default_view = View(self.default_view_name, None, None)
1311        for bucket in self.buckets:
1312            for i in xrange(int(self.ddocs_num)):
1313                views = self.make_default_views(self.default_view_name,
1314                                                self.view_num,
1315                                                self.is_dev_ddoc,
1316                                                different_map=True)
1317                ddoc = DesignDocument(self.default_view_name + str(i), views)
1318                self.ddocs.append(ddoc)
1319                for view in views:
1320                    self.cluster.create_view(self.master, ddoc.name, view,
1321                                             bucket=bucket)
1322