1import time, os
2
3from threading import Thread
4import threading
5from basetestcase import BaseTestCase
6from rebalance.rebalance_base import RebalanceBaseTest
7from membase.api.exception import RebalanceFailedException
8from membase.api.rest_client import RestConnection, RestHelper
9from couchbase_helper.documentgenerator import BlobGenerator
10from membase.helper.rebalance_helper import RebalanceHelper
11from remote.remote_util import RemoteMachineShellConnection
12from membase.helper.cluster_helper import ClusterOperationHelper
13
14class RebalanceInTests(RebalanceBaseTest):
15
16    def setUp(self):
17        super(RebalanceInTests, self).setUp()
18
19    def tearDown(self):
20        super(RebalanceInTests, self).tearDown()
21
22    """Rebalances nodes into a cluster while doing docs ops:create, delete, update.
23
24    This test begins by loading a given number of items into the cluster. It then
25    adds nodes_in nodes at a time and rebalances that nodes into the cluster.
26    During the rebalance we perform docs ops(add/remove/update/readd)
27    in the cluster( operate with a half of items that were loaded before).
28    Once the cluster has been rebalanced we wait for the disk queues to drain,
29    then verify that there has been no data loss and sum(curr_items) match the curr_items_total.
30    Once all nodes have been rebalanced in the test is finished."""
31    def rebalance_in_after_ops(self):
32        gen_update = BlobGenerator('mike', 'mike-', self.value_size, end=self.num_items)
33
34        tasks = list()
35        tasks += self._async_load_all_buckets(self.master, gen_update, "update", 0)
36        for task in tasks:
37            task.result()
38        servs_in = [self.servers[i + self.nodes_init]
39                    for i in range(self.nodes_in)]
40        self._verify_stats_all_buckets(self.servers[:self.nodes_init], timeout=120)
41        self._wait_for_stats_all_buckets(self.servers[:self.nodes_init])
42        self.sleep(20)
43        prev_failover_stats = self.get_failovers_logs(self.servers[:self.nodes_init], self.buckets)
44        prev_vbucket_stats = self.get_vbucket_seqnos(self.servers[:self.nodes_init], self.buckets)
45        disk_replica_dataset, disk_active_dataset = self.get_and_compare_active_replica_data_set_all(self.servers[:self.nodes_init], self.buckets, path=None)
46        self.compare_vbucketseq_failoverlogs(prev_vbucket_stats, prev_failover_stats)
47        rebalance = self.cluster.async_rebalance(
48            self.servers[:self.nodes_init], servs_in, [],
49            sleep_before_rebalance=self.sleep_before_rebalance)
50        rebalance.result()
51        self._verify_stats_all_buckets(self.servers[:self.nodes_in + self.nodes_init], timeout=120)
52        self.verify_cluster_stats(self.servers[:self.nodes_in + self.nodes_init], check_ep_items_remaining = True)
53        new_failover_stats = self.compare_failovers_logs(prev_failover_stats, self.servers[:self.nodes_in + self.nodes_init], self.buckets)
54        new_vbucket_stats = self.compare_vbucket_seqnos(prev_vbucket_stats, self.servers[:self.nodes_in + self.nodes_init], self.buckets)
55        self.compare_vbucketseq_failoverlogs(new_vbucket_stats, new_failover_stats)
56        self.sleep(30)
57        self.data_analysis_active_replica_all(disk_active_dataset, disk_replica_dataset, self.servers[:self.nodes_in + self.nodes_init], self.buckets, path=None)
58        self.verify_unacked_bytes_all_buckets()
59
60        nodes = self.get_nodes_in_cluster(self.master)
61        self.vb_distribution_analysis(
62            servers=nodes, buckets=self.buckets, std=1.0,
63            total_vbuckets=self.total_vbuckets)
64
65        # Validate seq_no snap_start/stop values after rebalance
66        self.check_snap_start_corruption()
67
68    """Rebalances nodes in with failover and full recovery add back of a node
69
70    This test begins by loading a given number of items into the cluster. It then
71    adds nodes_in nodes at a time and rebalances that nodes into the cluster.
72    During the rebalance we perform docs ops(add/remove/update/readd)
73    in the cluster( operate with a half of items that were loaded before).
74    Once the cluster has been rebalanced we wait for the disk queues to drain,
75    then verify that there has been no data loss and sum(curr_items) match the curr_items_total.
76    Once all nodes have been rebalanced in the test is finished."""
77    def rebalance_in_with_failover_full_addback_recovery(self):
78        gen_update = BlobGenerator('mike', 'mike-', self.value_size, end=self.num_items)
79        tasks = []
80        tasks += self._async_load_all_buckets(self.master, gen_update, "update", 0)
81        for task in tasks:
82            task.result()
83        servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
84        self._verify_stats_all_buckets(self.servers[:self.nodes_init], timeout=120)
85        self._wait_for_stats_all_buckets(self.servers[:self.nodes_init])
86        self.sleep(20)
87        prev_failover_stats = self.get_failovers_logs(self.servers[:self.nodes_init], self.buckets)
88        prev_vbucket_stats = self.get_vbucket_seqnos(self.servers[:self.nodes_init], self.buckets)
89        disk_replica_dataset, disk_active_dataset = self.get_and_compare_active_replica_data_set_all(self.servers[:self.nodes_init], self.buckets, path=None)
90        self.rest = RestConnection(self.master)
91        self.nodes = self.get_nodes(self.master)
92
93        chosen = RebalanceHelper.pick_nodes(self.master, howmany=1)
94        # Mark Node for failover
95        success_failed_over = self.rest.fail_over(chosen[0].id, graceful=False)
96
97        # Perform doc-mutation after node failover
98        tasks = self._async_load_all_buckets(
99            self.master, gen_update, "update", 0)
100        for task in tasks:
101            task.result()
102
103        # Mark Node for full recovery
104        if success_failed_over:
105            self.rest.set_recovery_type(otpNode=chosen[0].id, recoveryType="full")
106
107        rebalance = self.cluster.async_rebalance(
108            self.servers[:self.nodes_init], servs_in, [],
109            sleep_before_rebalance=self.sleep_before_rebalance)
110        rebalance.result()
111
112        # Validate seq_no snap_start/stop values after rebalance
113        self.check_snap_start_corruption()
114
115        self._verify_stats_all_buckets(self.servers[:self.nodes_in + self.nodes_init], timeout=120)
116        self.verify_cluster_stats(self.servers[:self.nodes_in + self.nodes_init], check_ep_items_remaining = True)
117        self.compare_failovers_logs(prev_failover_stats, self.servers[:self.nodes_in + self.nodes_init], self.buckets)
118        self.sleep(30)
119        self.data_analysis_active_replica_all(disk_active_dataset, disk_replica_dataset, self.servers[:self.nodes_in + self.nodes_init], self.buckets, path=None)
120        self.verify_unacked_bytes_all_buckets()
121        nodes = self.get_nodes_in_cluster(self.master)
122        self.vb_distribution_analysis(servers = nodes, buckets = self.buckets, std = 1.0 , total_vbuckets = self.total_vbuckets)
123
124    """Rebalances  after we do add node and graceful failover
125
126    This test begins by loading a given number of items into the cluster. It then
127    adds nodes_in nodes at a time and rebalances that nodes into the cluster.
128    During the rebalance we perform docs ops(add/remove/update/readd)
129    in the cluster( operate with a half of items that were loaded before).
130    We then  add a node and do graceful failover followed by rebalance
131    Once the cluster has been rebalanced we wait for the disk queues to drain,
132    then verify that there has been no data loss and sum(curr_items) match the curr_items_total.
133    Once all nodes have been rebalanced in the test is finished."""
134    def rebalance_in_with_failover(self):
135        fail_over = self.input.param("fail_over", False)
136        gen_update = BlobGenerator('mike', 'mike-', self.value_size, end=self.num_items)
137        tasks = []
138        tasks += self._async_load_all_buckets(self.master, gen_update, "update", 0)
139        for task in tasks:
140            task.result()
141        servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
142        self._verify_stats_all_buckets(self.servers[:self.nodes_init], timeout=120)
143        self._wait_for_stats_all_buckets(self.servers[:self.nodes_init])
144        self.sleep(20)
145        prev_failover_stats = self.get_failovers_logs(self.servers[:self.nodes_init], self.buckets)
146        prev_vbucket_stats = self.get_vbucket_seqnos(self.servers[:self.nodes_init], self.buckets)
147        disk_replica_dataset, disk_active_dataset = self.get_and_compare_active_replica_data_set_all(self.servers[:self.nodes_init], self.buckets, path=None)
148        self.rest = RestConnection(self.master)
149        self.nodes = self.get_nodes(self.master)
150        chosen = RebalanceHelper.pick_nodes(self.master, howmany=1)
151        self.rest = RestConnection(self.master)
152        self.rest.add_node(self.master.rest_username,
153                           self.master.rest_password,
154                           self.servers[self.nodes_init].ip,
155                           self.servers[self.nodes_init].port)
156
157        # Perform doc-mutation after add-node
158        tasks = self._async_load_all_buckets(self.master, gen_update, "update", 0)
159        for task in tasks:
160            task.result()
161
162        # Validate seq_no snap_start/stop values after rebalance
163        self.check_snap_start_corruption()
164
165        # Mark Node for failover
166        self.rest.fail_over(chosen[0].id, graceful=fail_over)
167
168        # Perform doc-mutation after node failover
169        tasks = self._async_load_all_buckets(self.master, gen_update, "update", 0)
170        for task in tasks:
171            task.result()
172
173        if fail_over:
174            self.assertTrue(self.rest.monitorRebalance(stop_if_loop=True), msg="Graceful Failover Failed")
175        self.nodes = self.rest.node_statuses()
176        self.rest.rebalance(otpNodes=[node.id for node in self.nodes],ejectedNodes=[chosen[0].id])
177        self.assertTrue(self.rest.monitorRebalance(stop_if_loop=True), msg="Rebalance Failed")
178
179        # Validate seq_no snap_start/stop values after rebalance
180        self.check_snap_start_corruption()
181
182        # Verification
183        new_server_list = self.add_remove_servers(self.servers,self.servers[:self.nodes_init],[chosen[0]],[self.servers[self.nodes_init]])
184        self._verify_stats_all_buckets(new_server_list, timeout=120)
185        self.verify_cluster_stats(new_server_list, check_ep_items_remaining = True)
186        self.compare_failovers_logs(prev_failover_stats, new_server_list, self.buckets)
187        self.sleep(30)
188        self.data_analysis_active_replica_all(disk_active_dataset, disk_replica_dataset, new_server_list, self.buckets, path=None)
189        self.verify_unacked_bytes_all_buckets()
190        nodes = self.get_nodes_in_cluster(self.master)
191        self.vb_distribution_analysis(servers = nodes, buckets = self.buckets, std = 1.0 , total_vbuckets = self.total_vbuckets)
192
193    """Rebalances nodes into a cluster while doing docs ops:create, delete, update.
194
195    This test begins by loading a given number of items into the cluster. It then
196    adds nodes_in nodes at a time and rebalances that nodes into the cluster.
197    During the rebalance we perform docs ops(add/remove/update/readd)
198    in the cluster( operate with a half of items that were loaded before).
199    Once the cluster has been rebalanced we wait for the disk queues to drain,
200    then verify that there has been no data loss and sum(curr_items) match the curr_items_total.
201    Once all nodes have been rebalanced in the test is finished."""
202    def rebalance_in_with_ops(self):
203        tasks = list()
204        gen_delete = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items / 2, end=self.num_items)
205        gen_create = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items + 1, end=self.num_items * 3/2)
206        servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
207        if self.doc_ops is not None:
208            # define which doc's ops will be performed during rebalancing
209            # allows multiple of them but one by one
210            if "update" in self.doc_ops:
211                tasks += self._async_load_all_buckets(self.master, self.gen_update, "update", 0)
212            if "create" in self.doc_ops:
213                tasks += self._async_load_all_buckets(self.master, gen_create, "create", 0)
214            if "delete" in self.doc_ops:
215                tasks += self._async_load_all_buckets(self.master, gen_delete, "delete", 0)
216
217        rebalance_task = self.cluster.async_rebalance(
218            self.servers[:self.nodes_init], servs_in, [],
219            sleep_before_rebalance=self.sleep_before_rebalance)
220
221        rebalance_task.result()
222        for task in tasks:
223            task.result()
224
225        self.sleep(10, "Sleep before stat verification")
226        self.verify_cluster_stats(self.servers[:self.nodes_in + self.nodes_init])
227        self.verify_unacked_bytes_all_buckets()
228
229        # Validate seq_no snap_start/stop values after rebalance
230        self.check_snap_start_corruption()
231
232    """Rebalances nodes into a cluster while doing docs ops:create, delete, update.
233
234    This test begins by loading a given number of items into the cluster.
235    We later run compaction on all buckets and do ops as well
236    """
237    def rebalance_in_with_compaction_and_ops(self):
238        tasks = list()
239        servs_in = [self.servers[i + self.nodes_init]
240                    for i in range(self.nodes_in)]
241
242        for bucket in self.buckets:
243            tasks.append(self.cluster.async_compact_bucket(self.master,
244                                                           bucket))
245        if self.doc_ops is not None:
246            if "update" in self.doc_ops:
247                # 1/2th of data will be updated in each iteration
248                tasks += self._async_load_all_buckets(
249                    self.master, self.gen_update, "update", 0,
250                    batch_size=20000, pause_secs=5, timeout_secs=180)
251            elif "create" in self.doc_ops:
252                # 1/2th of initial data will be added in each iteration
253                gen_create = BlobGenerator(
254                    'mike', 'mike-', self.value_size,
255                    start=self.num_items * (1 + i) / 2.0,
256                    end=self.num_items * (1 + i / 2.0))
257                tasks += self._async_load_all_buckets(
258                    self.master, gen_create, "create", 0,
259                    batch_size=20000, pause_secs=5, timeout_secs=180)
260            elif "delete" in self.doc_ops:
261                 # 1/(num_servers) of initial data will be removed after each iteration
262                # at the end we should get empty base( or couple items)
263                gen_delete = BlobGenerator('mike', 'mike-', self.value_size,
264                                           start=int(self.num_items * (1 - i / (self.num_servers - 1.0))) + 1,
265                                           end=int(self.num_items * (1 - (i - 1) / (self.num_servers - 1.0))))
266                tasks += self._async_load_all_buckets(
267                    self.master, gen_delete, "delete", 0,
268                    batch_size=20000, pause_secs=5, timeout_secs=180)
269
270        rebalance_task = self.cluster.async_rebalance(
271            self.servers[:self.nodes_init], servs_in, [],
272            sleep_before_rebalance=self.sleep_before_rebalance)
273
274        rebalance_task.result()
275        for task in tasks:
276            task.result()
277
278        self.verify_cluster_stats(self.servers[:self.nodes_in+self.nodes_init])
279        self.verify_unacked_bytes_all_buckets()
280
281        # Validate seq_no snap_start/stop values after rebalance
282        self.check_snap_start_corruption()
283
284    def rebalance_in_with_compaction_and_expiration_ops(self):
285        self.total_loader_threads = self.input.param("total_loader_threads", 10)
286        self.expiry_items = self.input.param("expiry_items", 100000)
287        self.max_expiry = self.input.param("max_expiry", 30)
288        self.rebalance_attempts = self.input.param("rebalance_attempts", 100)
289        thread_list = []
290        self._expiry_pager(self.master, val=1000000)
291        for bucket in self.buckets:
292            RestConnection(self.master).set_auto_compaction(dbFragmentThreshold=100, bucket = bucket.name)
293        num_items = self.expiry_items
294        expiry_range = self.max_expiry
295        for x in range(1,self.total_loader_threads):
296            t = threading.Thread(target=self.run_mc_bin_client, args = (num_items, expiry_range))
297            t.daemon = True
298            t.start()
299            thread_list.append(t)
300        for t in thread_list:
301            t.join()
302        for x in range(1,self.total_loader_threads):
303            num_items = 1000000
304            t = threading.Thread(target=self.run_mc_bin_client, args = (num_items, expiry_range))
305            t.daemon = True
306            t.start()
307            thread_list.append(t)
308        self.sleep(20)
309        tasks = []
310        for x in range(1,self.rebalance_attempts):
311            servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
312            rebalance = self.cluster.async_rebalance(
313                self.servers[:self.nodes_init], servs_in, [],
314                sleep_before_rebalance=self.sleep_before_rebalance)
315            rebalance.result()
316            self.servers  = self.servers[:self.nodes_init+len(servs_in)]
317            servs_out = self.servers[len(self.servers) - self.nodes_out:]
318            rebalance = self.cluster.async_rebalance(
319                self.servers[:1], [], servs_out,
320                sleep_before_rebalance=self.sleep_before_rebalance)
321            rebalance.result()
322        t = threading.Thread(target=self._run_compaction)
323        t.daemon = True
324        t.start()
325        thread_list.append(t)
326        for task in tasks:
327            task.result()
328
329    def rebalance_in_with_ops_batch(self):
330        gen_delete = BlobGenerator('mike', 'mike-', self.value_size, start=(self.num_items / 2 - 1), end=self.num_items)
331        gen_create = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items + 1, end=self.num_items * 3 / 2)
332        servs_in = [self.servers[i + 1] for i in range(self.nodes_in)]
333        rebalance = self.cluster.async_rebalance(
334            self.servers[:1], servs_in, [],
335            sleep_before_rebalance=self.sleep_before_rebalance)
336        if self.doc_ops is not None:
337            # define which doc's ops will be performed during rebalancing
338            # allows multiple of them but one by one
339            if "update" in self.doc_ops:
340                self._load_all_buckets(self.servers[0], self.gen_update, "update", 0, 1, 4294967295, True, batch_size=20000, pause_secs=5, timeout_secs=180)
341            if "create" in self.doc_ops:
342                self._load_all_buckets(self.servers[0], gen_create, "create", 0, 1, 4294967295, True, batch_size=20000, pause_secs=5, timeout_secs=180)
343            if "delete" in self.doc_ops:
344                self._load_all_buckets(self.servers[0], gen_delete, "delete", 0, 1, 4294967295, True, batch_size=20000, pause_secs=5, timeout_secs=180)
345        rebalance.result()
346        self._wait_for_stats_all_buckets(self.servers[:self.nodes_in + 1])
347        self._verify_all_buckets(self.master, 1, 1000, None, only_store_hash=True, batch_size=5000)
348        self._verify_stats_all_buckets(self.servers[:self.nodes_in + 1])
349        self.verify_unacked_bytes_all_buckets()
350
351    """Rebalances nodes into a cluster during getting random keys.
352
353    This test begins by loading a given number of items into the node.
354    Then it creates cluster with self.nodes_init nodes. Then we
355    send requests to all nodes in the cluster to get random key values.
356    Next step is add nodes_in nodes into cluster and rebalance it.
357    During rebalancing we get random keys from all nodes and verify
358    that are different every time.
359    Once the cluster has been rebalanced we again get random keys
360    from all new nodes in the cluster, than we wait for the disk queues
361    to drain, and then verify that there has been no data loss,
362    sum(curr_items) match the curr_items_total."""
363    def rebalance_in_get_random_key(self):
364        servs_in = self.servers[self.nodes_init:self.nodes_init+self.nodes_in]
365        rebalance = self.cluster.async_rebalance(
366            self.servers[:1], servs_in, [])
367        self.sleep(5)
368        rest_cons = [RestConnection(self.servers[i])
369                     for i in xrange(self.nodes_init)]
370        result = []
371        num_iter = 0
372        # get random keys for each node during rebalancing
373        while rest_cons[0]._rebalance_progress_status() == 'running' and num_iter < 100:
374            list_threads = []
375            temp_result = []
376            self.log.info("getting random keys for all nodes in cluster....")
377            for rest in rest_cons:
378                t = Thread(target=rest.get_random_key,
379                           name="get_random_key",
380                           args=(self.default_bucket_name,))
381                list_threads.append(t)
382                temp_result.append(rest.get_random_key(self.default_bucket_name))
383
384                t.start()
385            [t.join() for t in list_threads]
386
387            if tuple(temp_result) == tuple(result):
388                self.log.exception("random keys are not changed")
389            else:
390                result = temp_result
391            num_iter += 1
392
393        rebalance.result()
394        # get random keys for new added nodes
395        rest_cons = [RestConnection(self.servers[i])
396                     for i in xrange(self.nodes_init + self.nodes_in)]
397        list_threads = []
398        for rest in rest_cons:
399            t = Thread(target=rest.get_random_key,
400                       name="get_random_key",
401                       args=(self.default_bucket_name,))
402            list_threads.append(t)
403            t.start()
404        [t.join() for t in list_threads]
405        self.verify_cluster_stats(self.servers[:self.nodes_in+self.nodes_init])
406        self.verify_unacked_bytes_all_buckets()
407
408    """Rebalances nodes into a cluster while doing mutations.
409
410    This test begins by loading a given number of items into the cluster.
411    It then adds two nodes at a time and rebalances that node into the cluster.
412    During the rebalance we update(all of the items in the cluster)/
413    delete(num_items/(num_servers -1) in each iteration)/
414    create(a half of initial items in each iteration).
415    Once the cluster has been rebalanced we wait for the disk queues to drain,
416    and then verify that there has been no data loss,  sum(curr_items)
417    match the curr_items_total.
418    Once all nodes have been rebalanced in the test is finished."""
419    def incremental_rebalance_in_with_ops(self):
420        for i in range(1, self.num_servers, 2):
421            tasks = list()
422            if self.doc_ops is not None:
423                # define which doc's operation will be performed during
424                # rebalancing only one type of ops can be passed
425                if "update" in self.doc_ops:
426                    # 1/2th of data will be updated in each iteration
427                    tasks += self._async_load_all_buckets(self.master, self.gen_update, "update", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
428                elif "create" in self.doc_ops:
429                    # 1/2th of initial data will be added in each iteration
430                    gen_create = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items * (1 + i) / 2.0 , end=self.num_items * (1 + i / 2.0))
431                    tasks += self._async_load_all_buckets(self.master, gen_create, "create", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
432                elif "delete" in self.doc_ops:
433                    # 1/(num_servers) of initial data will be removed after each iteration
434                    # at the end we should get empty base( or couple items)
435                    gen_delete = BlobGenerator('mike', 'mike-', self.value_size, start=int(self.num_items * (1 - i / (self.num_servers - 1.0))) + 1, end=int(self.num_items * (1 - (i - 1) / (self.num_servers - 1.0))))
436                    tasks += self._async_load_all_buckets(self.master, gen_delete, "delete", 0, batch_size=20000, pause_secs=5, timeout_secs=180)
437
438            rebalance_task = self.cluster.async_rebalance(
439                self.servers[:i], self.servers[i:i + 2], [],
440                sleep_before_rebalance=self.sleep_before_rebalance)
441
442            rebalance_task.result()
443            for task in tasks:
444                task.result()
445            self.verify_cluster_stats(self.servers[:i + 2])
446
447            # Validate seq_no snap_start/stop values after rebalance
448            self.check_snap_start_corruption()
449
450        self.verify_unacked_bytes_all_buckets()
451
452    """Rebalances nodes into a cluster  during view queries.
453
454    This test begins by loading a given number of items into the cluster.
455    It creates num_views as development/production views with default
456    map view funcs(is_dev_ddoc = True by default). It then adds nodes_in nodes
457    at a time and rebalances that node into the cluster. During the rebalancing
458    we perform view queries for all views and verify the expected number of docs for them.
459    Perform the same view queries after cluster has been completed. Then we wait for
460    the disk queues to drain, and then verify that there has been no data loss,
461    sum(curr_items) match the curr_items_total.
462    Once successful view queries the test is finished.
463
464    added reproducer for MB-6683"""
465    def rebalance_in_with_queries(self):
466
467
468        self._wait_for_stats_all_buckets(self.servers[:self.nodes_init])
469
470        num_views = self.input.param("num_views", 5)
471        is_dev_ddoc = self.input.param("is_dev_ddoc", True)
472        reproducer = self.input.param("reproducer", False)
473        num_tries = self.input.param("num_tries", 10)
474        iterations_to_try = (1, num_tries)[reproducer]
475        ddoc_name = "ddoc1"
476        prefix = ("", "dev_")[is_dev_ddoc]
477
478        query = {}
479        query["connectionTimeout"] = 60000;
480        query["full_set"] = "true"
481
482        views = []
483        tasks = []
484        for bucket in self.buckets:
485            temp = self.make_default_views(self.default_view_name, num_views,
486                                           is_dev_ddoc, different_map=reproducer)
487            temp_tasks = self.async_create_views(self.master, ddoc_name, temp, bucket)
488            views += temp
489            tasks += temp_tasks
490
491        timeout = None
492        if self.active_resident_threshold == 0:
493            timeout = max(self.wait_timeout * 4, len(self.buckets) * self.wait_timeout * self.num_items / 50000)
494
495        for task in tasks:
496            task.result(self.wait_timeout * 20)
497
498        for bucket in self.buckets:
499                for view in views:
500                    # run queries to create indexes
501                    self.cluster.query_view(self.master, prefix + ddoc_name, view.name, query)
502
503        active_tasks = self.cluster.async_monitor_active_task(self.servers[:self.nodes_init], "indexer", "_design/" + prefix + ddoc_name, wait_task=False)
504        for active_task in active_tasks:
505            result = active_task.result()
506            self.assertTrue(result)
507
508        expected_rows = None
509        if self.max_verify:
510            expected_rows = self.max_verify
511            query["limit"] = expected_rows
512        query["stale"] = "false"
513
514
515        for bucket in self.buckets:
516            self.perform_verify_queries(num_views, prefix, ddoc_name, query, bucket=bucket, wait_time=timeout, expected_rows=expected_rows)
517        for i in xrange(iterations_to_try):
518            servs_in = self.servers[self.nodes_init:self.nodes_init + self.nodes_in]
519            rebalance = self.cluster.async_rebalance(
520                [self.master], servs_in, [],
521                sleep_before_rebalance=self.sleep_before_rebalance)
522            self.sleep(self.wait_timeout / 5)
523
524            # see that the result of view queries are the same as expected during the test
525            for bucket in self.buckets:
526                self.perform_verify_queries(num_views, prefix, ddoc_name, query, bucket=bucket, wait_time=timeout, expected_rows=expected_rows)
527
528            rebalance.result()
529
530            # verify view queries results after rebalancing
531            for bucket in self.buckets:
532                self.perform_verify_queries(num_views, prefix, ddoc_name, query, bucket=bucket, wait_time=timeout, expected_rows=expected_rows)
533
534            self.verify_cluster_stats(self.servers[:self.nodes_in + self.nodes_init])
535            if reproducer:
536                rebalance = self.cluster.async_rebalance(
537                    self.servers, [], servs_in,
538                    sleep_before_rebalance=self.sleep_before_rebalance)
539                rebalance.result()
540                self.sleep(self.wait_timeout)
541        self.verify_unacked_bytes_all_buckets()
542
543    """Rebalances nodes into a cluster incremental during view queries.
544
545    This test begins by loading a given number of items into the cluster. It creates num_views as
546    development/production view with default map view funcs(is_dev_ddoc = True by default).
547    It then adds two nodes at a time and rebalances that node into the cluster. During the rebalancing
548    we perform view queries for all views and verify the expected number of docs for them.
549    Perform the same view queries after cluster has been completed. Then we wait for
550    the disk queues to drain, and then verify that there has been no data loss,
551    sum(curr_items) match the curr_items_total.
552    Once all nodes have been rebalanced in the test is finished."""
553    def incremental_rebalance_in_with_queries(self):
554        num_views = self.input.param("num_views", 3)
555        is_dev_ddoc = self.input.param("is_dev_ddoc", False)
556        views = self.make_default_views(self.default_view_name, num_views, is_dev_ddoc)
557        ddoc_name = "ddoc1"
558        prefix = ("", "dev_")[is_dev_ddoc]
559        # increase timeout for big data
560        timeout = max(self.wait_timeout * 4, self.wait_timeout * self.num_items / 25000)
561        query = {}
562        query["connectionTimeout"] = 60000
563        query["full_set"] = "true"
564        tasks = []
565        tasks = self.async_create_views(self.master, ddoc_name, views, self.default_bucket_name)
566        for task in tasks:
567            task.result(self.wait_timeout * 2)
568        for view in views:
569            # run queries to create indexes
570            self.cluster.query_view(self.master, prefix + ddoc_name, view.name, query)
571
572        active_tasks = self.cluster.async_monitor_active_task(self.master, "indexer", "_design/" + prefix + ddoc_name, wait_task=False)
573        for active_task in active_tasks:
574            result = active_task.result()
575            self.assertTrue(result)
576
577        expected_rows = None
578        if self.max_verify:
579            expected_rows = self.max_verify
580            query["limit"] = expected_rows
581        query["stale"] = "false"
582
583        self.perform_verify_queries(num_views, prefix, ddoc_name, query, wait_time=timeout, expected_rows=expected_rows)
584        query["stale"] = "update_after"
585        for i in range(1, self.num_servers, 2):
586            rebalance = self.cluster.async_rebalance(
587                self.servers[:i], self.servers[i:i + 2], [],
588                sleep_before_rebalance=self.sleep_before_rebalance)
589            self.sleep(self.wait_timeout / 5)
590            # see that the result of view queries are the same as expected during the test
591            self.perform_verify_queries(num_views, prefix, ddoc_name, query, wait_time=timeout, expected_rows=expected_rows)
592            # verify view queries results after rebalancing
593            rebalance.result()
594            self.perform_verify_queries(num_views, prefix, ddoc_name, query, wait_time=timeout, expected_rows=expected_rows)
595            self.verify_cluster_stats(self.servers[:i + 2])
596        self.verify_unacked_bytes_all_buckets()
597
598    """Rebalances nodes into a cluster when one node is warming up.
599
600    This test begins by loading a given number of items into the node.
601    Then it creates cluster with self.nodes_init nodes. Next steps are:
602    stop the latest node in servs_init list( if list size equals 1, master node/
603    cluster will be stopped), wait 20 sec and start the stopped node. Without waiting for
604    the node to start up completely, rebalance in servs_in servers. Expect that
605    rebalance is failed. Wait for warmup complted and strart rebalance with the same
606    configuration. Once the cluster has been rebalanced we wait for the disk queues
607    to drain, and then verify that there has been no data loss,
608    sum(curr_items) match the curr_items_total."""
609    def rebalance_in_with_warming_up(self):
610        servs_in = self.servers[self.nodes_init:self.nodes_init + self.nodes_in]
611        servs_init = self.servers[:self.nodes_init]
612        warmup_node = servs_init[-1]
613        shell = RemoteMachineShellConnection(warmup_node)
614        shell.stop_couchbase()
615        self.sleep(20)
616        shell.start_couchbase()
617        shell.disconnect()
618        try:
619            rebalance = self.cluster.async_rebalance(
620                servs_init, servs_in, [],
621                sleep_before_rebalance=self.sleep_before_rebalance)
622            rebalance.result()
623        except RebalanceFailedException:
624            self.log.info("rebalance was failed as expected")
625            self.assertTrue(ClusterOperationHelper._wait_warmup_completed(self, [warmup_node], \
626                            self.default_bucket_name, wait_time=self.wait_timeout * 10))
627
628            self.log.info("second attempt to rebalance")
629            rebalance = self.cluster.async_rebalance(
630                servs_init + servs_in, [], [],
631                sleep_before_rebalance=self.sleep_before_rebalance)
632            rebalance.result()
633        self.verify_cluster_stats(self.servers[:self.nodes_in + self.nodes_init])
634        self.verify_unacked_bytes_all_buckets()
635
636
637    """Rebalances nodes into a cluster during ddoc compaction.
638
639    This test begins by loading a given number of items into the cluster.
640    It creates num_views as development/production view with default
641    map view funcs(is_dev_ddoc = True by default). Then we disabled compaction for
642    ddoc. While we don't reach expected fragmentation for ddoc we update docs and perform
643    view queries. We rebalance in  nodes_in nodes and start compation when fragmentation
644    was reached fragmentation_value. During the rebalancing we wait
645    while compaction will be completed. After rebalancing and compaction we wait for
646    the disk queues to drain, and then verify that there has been no data loss,
647    sum(curr_items) match the curr_items_total."""
648    def rebalance_in_with_ddoc_compaction(self):
649        num_views = self.input.param("num_views", 5)
650        fragmentation_value = self.input.param("fragmentation_value", 80)
651        # now dev_ indexes are not auto-updated, doesn't work with dev view
652        is_dev_ddoc = False
653        views = self.make_default_views(self.default_view_name, num_views, is_dev_ddoc)
654        ddoc_name = "ddoc1"
655        prefix = ("", "dev_")[is_dev_ddoc]
656
657        query = {}
658        query["connectionTimeout"] = 60000;
659        query["full_set"] = "true"
660
661        expected_rows = None
662        if self.max_verify:
663            expected_rows = self.max_verify
664            query["limit"] = expected_rows
665
666        tasks = []
667        tasks = self.async_create_views(self.master, ddoc_name, views, self.default_bucket_name)
668        for task in tasks:
669            task.result(self.wait_timeout * 2)
670        self.disable_compaction()
671        fragmentation_monitor = self.cluster.async_monitor_view_fragmentation(self.master,
672                         prefix + ddoc_name, fragmentation_value, self.default_bucket_name)
673        end_time = time.time() + self.wait_timeout * 30
674        # generate load until fragmentation reached
675        while fragmentation_monitor.state != "FINISHED" and end_time > time.time():
676            # update docs to create fragmentation
677            self._load_all_buckets(self.master, self.gen_update, "update", 0)
678            for view in views:
679                # run queries to create indexes
680                self.cluster.query_view(self.master, prefix + ddoc_name, view.name, query)
681        if end_time < time.time() and fragmentation_monitor.state != "FINISHED":
682            self.fail("impossible to reach compaction value {0} after {1} sec".
683                      format(fragmentation_value, (self.wait_timeout * 30)))
684
685        fragmentation_monitor.result()
686
687        for i in xrange(3):
688            active_tasks = self.cluster.async_monitor_active_task(self.master, "indexer", "_design/" + ddoc_name, wait_task=False)
689            for active_task in active_tasks:
690                result = active_task.result()
691                self.assertTrue(result)
692            self.sleep(2)
693
694        query["stale"] = "false"
695
696        self.perform_verify_queries(num_views, prefix, ddoc_name, query, wait_time=self.wait_timeout * 3, expected_rows=expected_rows)
697
698        compaction_task = self.cluster.async_compact_view(self.master, prefix + ddoc_name, self.default_bucket_name, with_rebalance=True)
699        servs_in = self.servers[1:self.nodes_in + 1]
700        rebalance = self.cluster.async_rebalance(
701            [self.master], servs_in, [],
702            sleep_before_rebalance=self.sleep_before_rebalance)
703        result = compaction_task.result(self.wait_timeout * 10)
704        self.assertTrue(result)
705        rebalance.result()
706        self.verify_cluster_stats(self.servers[:self.nodes_in + 1])
707        self.verify_unacked_bytes_all_buckets()
708
709    """Rebalances nodes into a cluster while doing mutations and deletions.
710
711    This test begins by loading a given number of items into the cluster. It then
712    adds one node at a time and rebalances that node into the cluster. During the
713    rebalance we update half of the items in the cluster and delete the other half.
714    Once the cluster has been rebalanced we recreate the deleted items, wait for the
715    disk queues to drain, and then verify that there has been no data loss.
716    sum(curr_items) match the curr_items_total.
717    Once all nodes have been rebalanced in the test is finished."""
718    def incremental_rebalance_in_with_mutation_and_deletion(self):
719        gen_delete = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items / 2,
720                              end=self.num_items)
721
722        for i in range(self.num_servers)[1:]:
723            rebalance = self.cluster.async_rebalance(
724                self.servers[:i], [self.servers[i]], [],
725                sleep_before_rebalance=self.sleep_before_rebalance)
726            self._load_all_buckets(self.master, self.gen_update, "update", 0)
727            self._load_all_buckets(self.master, gen_delete, "delete", 0)
728            rebalance.result()
729            self._load_all_buckets(self.master, gen_delete, "create", 0)
730            self.verify_cluster_stats(self.servers[:i + 1])
731        self.verify_unacked_bytes_all_buckets()
732
733    """Rebalances nodes into a cluster while doing mutations and expirations.
734
735    This test begins by loading a given number of items into the cluster. It then
736    adds one node at a time and rebalances that node into the cluster. During the
737    rebalance we update all items in the cluster. Half of the items updated are also
738    given an expiration time of 5 seconds. Once the cluster has been rebalanced we
739    recreate the expired items, wait for the disk queues to drain, and then verify
740    that there has been no data loss, sum(curr_items) match the curr_items_total.
741    Once all nodes have been rebalanced in the test is finished."""
742    def incremental_rebalance_in_with_mutation_and_expiration(self):
743        gen_2 = BlobGenerator('mike', 'mike-', self.value_size, start=self.num_items / 2,
744                              end=self.num_items)
745        for i in range(self.num_servers)[1:]:
746            rebalance = self.cluster.async_rebalance(
747                self.servers[:i], [self.servers[i]], [],
748                sleep_before_rebalance=self.sleep_before_rebalance)
749            self._load_all_buckets(self.master, self.gen_update, "update", 0)
750            self._load_all_buckets(self.master, gen_2, "update", 5)
751            self.sleep(5)
752            rebalance.result()
753            self._load_all_buckets(self.master, gen_2, "create", 0)
754            self.verify_cluster_stats(self.servers[:i + 1])
755        self.verify_unacked_bytes_all_buckets()
756
757    '''
758    test rebalances nodes_in nodes ,
759    changes bucket passwords and then rebalances nodes_in_second nodes
760    '''
761    def rebalance_in_with_bucket_password_change(self):
762        if self.sasl_buckets == 0:
763            self.fail("no sasl buckets are specified!")
764        new_pass = self.input.param("new_pass", "new_pass")
765        servs_in = self.servers[self.nodes_init:self.nodes_init + self.nodes_in]
766        nodes_in_second = self.input.param("nodes_in_second", 1)
767        servs_in_second = self.servers[self.nodes_init + self.nodes_in:
768                                       self.nodes_init + self.nodes_in + nodes_in_second]
769        servs_init = self.servers[:self.nodes_init]
770        servs_result = self.servers[:self.nodes_init + self.nodes_in]
771
772        rebalance = self.cluster.async_rebalance(
773            servs_init, servs_in, [],
774            sleep_before_rebalance=self.sleep_before_rebalance)
775        rebalance.result()
776        rest = RestConnection(self.master)
777        bucket_to_change = [bucket for bucket in self.buckets
778                            if bucket.authType == 'sasl' and bucket.name != 'default'][0]
779        rest.change_bucket_props(bucket_to_change, saslPassword=new_pass)
780        rebalance = self.cluster.async_rebalance(
781            servs_result, servs_in_second, [],
782            sleep_before_rebalance=self.sleep_before_rebalance)
783        rebalance.result()
784        self.verify_unacked_bytes_all_buckets()
785
786    '''
787    test changes password of cluster during rebalance.
788    http://www.couchbase.com/issues/browse/MB-6459
789    '''
790    def rebalance_in_with_cluster_password_change(self):
791        new_password = self.input.param("new_password", "new_pass")
792        servs_result = self.servers[:self.nodes_init + self.nodes_in]
793        rebalance = self.cluster.async_rebalance(
794            self.servers[:self.nodes_init],
795            self.servers[self.nodes_init:self.nodes_init + self.nodes_in], [],
796            sleep_before_rebalance=self.sleep_before_rebalance)
797        old_pass = self.master.rest_password
798        self.sleep(10, "Wait for rebalance have some progress")
799        self.change_password(new_password=new_password)
800        try:
801            rebalance.result()
802            self.log.exception("rebalance should be failed when password is changing")
803            self.verify_unacked_bytes_all_buckets()
804        except Exception as ex:
805            self.sleep(10, "wait for rebalance failed")
806            rest = RestConnection(self.master)
807            self.log.info("Latest logs from UI:")
808            for i in rest.get_logs(): self.log.error(i)
809            self.assertFalse(RestHelper(rest).is_cluster_rebalanced())
810        finally:
811            self.change_password(new_password=old_pass)
812
813    '''
814    test changes ram quota during rebalance.
815    http://www.couchbase.com/issues/browse/CBQE-1649
816    '''
817    def test_rebalance_in_with_cluster_ramquota_change(self):
818        rebalance = self.cluster.async_rebalance(
819            self.servers[:self.nodes_init],
820            self.servers[self.nodes_init:self.nodes_init + self.nodes_in], [],
821            sleep_before_rebalance=self.sleep_before_rebalance)
822        self.sleep(10, "Wait for rebalance have some progress")
823        remote = RemoteMachineShellConnection(self.master)
824        cli_command = "setting-cluster"
825        options = "--cluster-ramsize=%s" % (3000)
826        output, error = remote.execute_couchbase_cli(cli_command=cli_command, options=options, cluster_host="localhost",
827                                                     user=self.master.rest_username, password=self.master.rest_password)
828        self.assertTrue('\n'.join(output).find('SUCCESS') != -1, 'RAM wasn\'t chnged')
829        rebalance.result()
830
831
832class RebalanceWithPillowFight(BaseTestCase):
833
834    def load(self, server, items, batch=1000):
835        import subprocess
836        from lib.testconstants import COUCHBASE_FROM_SPOCK
837        rest = RestConnection(server)
838        cmd = "cbc version"
839        rc = subprocess.call(cmd, shell=True)
840        if rc != 0:
841            self.fail("Exception running cbc-version: subprocess module returned non-zero response!")
842        cmd = "cbc-pillowfight -U couchbase://{0}/default -I {1} -M 50 -B 1000 --populate-only --json" \
843            .format(server.ip, items, batch)
844        if rest.get_nodes_version()[:5] in COUCHBASE_FROM_SPOCK:
845            cmd += " -u Administrator -P password"
846        self.log.info("Executing '{0}'...".format(cmd))
847        rc = subprocess.call(cmd, shell=True)
848        if rc != 0:
849            self.fail("Exception running cbc-pillowfight: subprocess module returned non-zero response!")
850
851    def check_dataloss(self, server, bucket):
852        from couchbase.bucket import Bucket
853        from couchbase.exceptions import NotFoundError
854        from lib.memcached.helper.data_helper import VBucketAwareMemcached
855        bkt = Bucket('couchbase://{0}/{1}'.format(server.ip, bucket.name), username="cbadminbucket", password="password")
856        rest = RestConnection(self.master)
857        VBucketAware = VBucketAwareMemcached(rest, bucket.name)
858        _, _, _ = VBucketAware.request_map(rest, bucket.name)
859        batch_start = 0
860        batch_end = 0
861        batch_size = 10000
862        errors = []
863        while self.num_items > batch_end:
864            batch_end = batch_start + batch_size
865            keys = []
866            for i in xrange(batch_start, batch_end, 1):
867                keys.append(str(i).rjust(20, '0'))
868            try:
869                bkt.get_multi(keys)
870                self.log.info("Able to fetch keys starting from {0} to {1}".format(keys[0], keys[len(keys)-1]))
871            except Exception as e:
872                self.log.error(e)
873                self.log.info("Now trying keys in the batch one at a time...")
874                key = ''
875                try:
876                    for key in keys:
877                        bkt.get(key)
878                except NotFoundError:
879                    vBucketId = VBucketAware._get_vBucket_id(key)
880                    errors.append("Missing key: {0}, VBucketId: {1}".
881                                  format(key, vBucketId))
882            batch_start += batch_size
883        return errors
884
885    def test_dataloss_rebalance_in(self):
886        rest = RestConnection(self.master)
887        bucket = rest.get_buckets()[0]
888        load_thread = Thread(target=self.load,
889                               name="pillowfight_load",
890                               args=(self.master, self.num_items))
891
892        self.log.info('starting the load thread...')
893        load_thread.start()
894        rebalance = self.cluster.async_rebalance(
895            self.servers[:self.nodes_init],
896            self.servers[self.nodes_init:self.nodes_init + self.nodes_in], [],
897            sleep_before_rebalance=self.sleep_before_rebalance)
898        rebalance.result()
899        load_thread.join()
900        errors = self.check_dataloss(self.master, bucket)
901        if errors:
902            self.log.info("Printing missing keys:")
903        for error in errors:
904            print error
905        if self.num_items != rest.get_active_key_count(bucket):
906            self.fail("FATAL: Data loss detected!! Docs loaded : {0}, docs present: {1}".
907                          format(self.num_items, rest.get_active_key_count(bucket) ))
908
909