1from newupgradebasetest import NewUpgradeBaseTest
2import Queue
3import copy
4from membase.helper.cluster_helper import ClusterOperationHelper
5import threading
6from random import randrange, randint
7from remote.remote_util import RemoteMachineShellConnection, RemoteUtilHelper
8from couchbase_helper.tuq_helper import N1QLHelper
9from eventing.eventing_base import EventingBaseTest
10from pytests.eventing.eventing_constants import HANDLER_CODE
11from lib.testconstants import STANDARD_BUCKET_PORT
12from membase.api.rest_client import RestConnection, RestHelper
13from couchbase_helper.documentgenerator import BlobGenerator
14from membase.helper.bucket_helper import BucketOperationHelper
15
16class UpgradeTests(NewUpgradeBaseTest, EventingBaseTest):
17
18    def setUp(self):
19        super(UpgradeTests, self).setUp()
20        self.queue = Queue.Queue()
21        self.graceful =  self.input.param("graceful",False)
22        self.after_upgrade_nodes_in =  self.input.param("after_upgrade_nodes_in",1)
23        self.after_upgrade_nodes_out =  self.input.param("after_upgrade_nodes_out",1)
24        self.verify_vbucket_info =  self.input.param("verify_vbucket_info",True)
25        self.initialize_events = self.input.param("initialize_events","").split(":")
26        self.upgrade_services_in = self.input.param("upgrade_services_in", None)
27        self.after_upgrade_services_in = \
28                              self.input.param("after_upgrade_services_in",None)
29        self.after_upgrade_services_out_dist = \
30                            self.input.param("after_upgrade_services_out_dist",None)
31        self.in_between_events = self.input.param("in_between_events","").split(":")
32        self.after_events = self.input.param("after_events","").split(":")
33        self.before_events = self.input.param("before_events","").split(":")
34        self.upgrade_type = self.input.param("upgrade_type","online")
35        self.sherlock_upgrade = self.input.param("sherlock",False)
36        self.max_verify = self.input.param("max_verify", None)
37        self.verify_after_events = self.input.param("verify_after_events", True)
38        self.online_upgrade_type = self.input.param("online_upgrade_type","swap")
39        self.src_bucket_name = self.input.param('src_bucket_name', 'src_bucket')
40        self.eventing_log_level = self.input.param('eventing_log_level', 'INFO')
41        self.dst_bucket_name = self.input.param('dst_bucket_name', 'dst_bucket')
42        self.dst_bucket_name1 = self.input.param('dst_bucket_name1', 'dst_bucket1')
43        self.metadata_bucket_name = self.input.param('metadata_bucket_name', 'metadata')
44        self.create_functions_buckets = self.input.param('create_functions_buckets', True)
45        self.use_memory_manager = self.input.param('use_memory_manager', True)
46        self.final_events = []
47        self.n1ql_helper = None
48        self.total_buckets = 1
49        self.in_servers_pool = self._convert_server_map(self.servers[:self.nodes_init])
50        """ Init nodes to not upgrade yet """
51        for key in self.in_servers_pool.keys():
52            self.in_servers_pool[key].upgraded = False
53        self.out_servers_pool = self._convert_server_map(self.servers[self.nodes_init:])
54        self.gen_initial_create = BlobGenerator('upgrade', 'upgrade',\
55                                            self.value_size, end=self.num_items)
56        self.gen_create = BlobGenerator('upgrade', 'upgrade', self.value_size,\
57                            start=self.num_items + 1 , end=self.num_items * 1.5)
58        self.gen_update = BlobGenerator('upgrade', 'upgrade', self.value_size,\
59                                   start=self.num_items / 2, end=self.num_items)
60        self.gen_delete = BlobGenerator('upgrade', 'upgrade', self.value_size,\
61                           start=self.num_items / 4, end=self.num_items / 2 - 1)
62        self.after_gen_create = BlobGenerator('upgrade', 'upgrade',\
63             self.value_size, start=self.num_items * 1.6 , end=self.num_items * 2)
64        self.after_gen_update = BlobGenerator('upgrade', 'upgrade',\
65                                  self.value_size, start=1 , end=self.num_items/4)
66        self.after_gen_delete = BlobGenerator('upgrade', 'upgrade',\
67                                      self.value_size, start=self.num_items * .5,\
68                                                         end=self.num_items* 0.75)
69        initial_services_setting = self.input.param("initial-services-setting", None)
70        self._install(self.servers[:self.nodes_init])
71        if not self.init_nodes and initial_services_setting is not None:
72            self.initialize_nodes(self.servers[:self.nodes_init],
73                                  services=initial_services_setting)
74        self._log_start(self)
75        if len(self.servers[:self.nodes_init]) > 1:
76            if initial_services_setting is None:
77                self.cluster.rebalance(self.servers[:1], self.servers[1:self.nodes_init],
78                                                   [], use_hostnames=self.use_hostnames)
79            else:
80                set_services = self.initial_services(initial_services_setting)
81                for i in range(1, len(set_services)):
82                    self.cluster.rebalance([self.servers[0]], [self.servers[i]], [],
83                                                   use_hostnames=self.use_hostnames,
84                                                         services=[set_services[i]])
85                    self.sleep(10)
86        else:
87            self.cluster.rebalance([self.servers[0]], self.servers[1:], [],
88                                         use_hostnames=self.use_hostnames)
89        self.sleep(10)
90        """ sometimes, when upgrade failed and node does not install couchbase
91            server yet, we could not set quota at beginning of the test.  We
92            have to wait to install new couchbase server to set it properly here """
93        servers_available = copy.deepcopy(self.servers)
94        if len(self.servers) > int(self.nodes_init):
95            servers_available = servers_available[:self.nodes_init]
96        self.quota = self._initialize_nodes(self.cluster, servers_available,\
97                                             self.disabled_consistent_view,\
98                                        self.rebalanceIndexWaitingDisabled,\
99                                        self.rebalanceIndexPausingDisabled,\
100                                                  self.maxParallelIndexers,\
101                                           self.maxParallelReplicaIndexers,\
102                                                                 self.port)
103        self.add_built_in_server_user(node=self.master)
104        self.bucket_size = self._get_bucket_size(self.quota, self.total_buckets)
105        self.create_buckets()
106        self.n1ql_server = None
107        self.success_run = True
108        self.failed_thread = None
109        self.generate_map_nodes_out_dist_upgrade(self.after_upgrade_services_out_dist)
110        if self.upgrade_services_in != "same":
111            self.upgrade_services_in = self.get_services(self.in_servers_pool.values(),
112                                          self.upgrade_services_in, start_node = 0)
113        self.after_upgrade_services_in = self.get_services(self.out_servers_pool.values(),
114                                           self.after_upgrade_services_in, start_node = 0)
115        self.fts_obj = None
116        self.index_name_prefix = None
117
118    def tearDown(self):
119        super(UpgradeTests, self).tearDown()
120
121    def test_upgrade(self):
122        self.event_threads = []
123        self.after_event_threads = []
124        try:
125            self.log.info("\n*** Start init operations before upgrade begins ***")
126            if self.initialize_events:
127                initialize_events = self.run_event(self.initialize_events)
128            self.finish_events(initialize_events)
129            if not self.success_run and self.failed_thread is not None:
130                raise Exception("*** Failed to {0} ***".format(self.failed_thread))
131            self.cluster_stats(self.servers[:self.nodes_init])
132            if self.before_events:
133                self.event_threads += self.run_event(self.before_events)
134            self.log.info("\n*** Start upgrade cluster ***")
135            self.event_threads += self.upgrade_event()
136            if self.upgrade_type == "online":
137                self.monitor_dcp_rebalance()
138            self.finish_events(self.event_threads)
139            self.log.info("\nWill install upgrade version to any free nodes")
140            out_nodes = self._get_free_nodes()
141            self.log.info("Here is free nodes {0}".format(out_nodes))
142            """ only install nodes out when there is cluster operation """
143            cluster_ops = ["rebalance_in", "rebalance_out", "rebalance_in_out"]
144            for event in self.after_events[0].split("-"):
145                if event in cluster_ops:
146                    self.log.info("\n\nThere are cluster ops after upgrade.  Need to "
147                                  "install free nodes in upgrade version")
148                    self._install(out_nodes)
149                    break
150            self.generate_map_nodes_out_dist_upgrade(\
151                                      self.after_upgrade_services_out_dist)
152            self.log.info("\n\n*** Start operations after upgrade is done ***")
153            self.add_built_in_server_user()
154            if self.after_events:
155                self.after_event_threads = self.run_event(self.after_events)
156            self.finish_events(self.after_event_threads)
157            if not self.success_run and self.failed_thread is not None:
158                raise Exception("*** Failed to {0} ***".format(self.failed_thread))
159            """ Default set to always verify data """
160            if self.after_events[0]:
161                self.log.info("*** Start after events ***")
162                for event in self.after_events[0].split("-"):
163                    if "delete_buckets" in event:
164                        self.log.info("After events has delete buckets event. "
165                                      "No items verification needed")
166                        self.verify_after_events = False
167                        break
168            if self.verify_after_events:
169                self.log.info("*** Start data verification ***")
170                self.cluster_stats(self.in_servers_pool.values())
171                self._verify_data_active_replica()
172        except Exception, ex:
173            self.log.info(ex)
174            print  "*** Stop all events to stop the test ***"
175            self.stop_all_events(self.event_threads)
176            self.stop_all_events(self.after_event_threads)
177            raise
178        finally:
179            self.log.info("any events for which we need to cleanup")
180            self.cleanup_events()
181
182    def _record_vbuckets(self, master, servers):
183        map ={}
184        for bucket in self.buckets:
185            self.log.info(" record vbucket for the bucket {0}".format(bucket.name))
186            map[bucket.name] = RestHelper(RestConnection(master))\
187                                   ._get_vbuckets(servers, bucket_name=bucket.name)
188        return map
189
190    def _find_master(self):
191        self.master = self.in_servers_pool.values()[0]
192
193    def _verify_data_active_replica(self):
194        """ set data_analysis True by default """
195        self.data_analysis =  self.input.param("data_analysis",False)
196        self.total_vbuckets =  self.initial_vbuckets
197        if self.data_analysis:
198            disk_replica_dataset, disk_active_dataset = \
199                        self.get_and_compare_active_replica_data_set_all(\
200                                           self.in_servers_pool.values(),\
201                                                            self.buckets,\
202                                                                path=None)
203            self.data_analysis_active_replica_all(disk_active_dataset,\
204                                                 disk_replica_dataset,\
205                                        self.in_servers_pool.values(),\
206                                               self.buckets, path=None)
207            """ check vbucket distribution analysis after rebalance """
208            self.vb_distribution_analysis(servers = \
209                      self.in_servers_pool.values(),\
210                             buckets = self.buckets,\
211                                         std = 1.0 ,\
212                total_vbuckets = self.total_vbuckets)
213
214    def _verify_vbuckets(self, old_vbucket_map, new_vbucket_map):
215        for bucket in self.buckets:
216            self._verify_vbucket_nums_for_swap(old_vbucket_map[bucket.name],\
217                                                new_vbucket_map[bucket.name])
218
219    def stop_all_events(self, thread_list):
220        for t in thread_list:
221            try:
222                if t.isAlive():
223                    t.stop()
224            except Exception, ex:
225                self.log.info(ex)
226
227    def cleanup_events(self):
228        thread_list = []
229        for event in self.final_events:
230            t = threading.Thread(target=self.find_function(event), args = ())
231            t.daemon = True
232            t.start()
233            thread_list.append(t)
234        for t in thread_list:
235            t.join()
236
237    def run_event_in_sequence(self, events):
238        q = self.queue
239        self.log.info("run_event_in_sequence")
240        for event in events.split("-"):
241            t = threading.Thread(target=self.find_function(event), args = (q,))
242            t.daemon = True
243            t.start()
244            t.join()
245            self.success_run = True
246            while not self.queue.empty():
247                self.success_run &= self.queue.get()
248            if not self.success_run:
249                self.failed_thread = event
250                break
251
252    def run_event(self, events):
253        thread_list = []
254        for event in events:
255            if "-" in event:
256                t = threading.Thread(target=self.run_event_in_sequence, args = (event,))
257                t.start()
258                t.join()
259            elif event != '':
260                t = threading.Thread(target=self.find_function(event), args = ())
261                t.daemon = True
262                t.start()
263                thread_list.append(t)
264        return thread_list
265
266    def find_function(self, event):
267        return getattr(self, event)
268
269    def finish_events(self, thread_list):
270        for t in thread_list:
271            t.join()
272
273    def upgrade_event(self):
274        self.log.info("upgrade_event")
275        thread_list = []
276        if self.upgrade_type == "online":
277            t = threading.Thread(target=self.online_upgrade, args = ())
278        elif self.upgrade_type == "offline":
279            t = threading.Thread(target=self.offline_upgrade, args = ())
280        t.daemon = True
281        t.start()
282        thread_list.append(t)
283        return thread_list
284
285    def server_crash(self):
286        try:
287            self.log.info("server_crash")
288            self.targetProcess= self.input.param("targetProcess",'memcached')
289            for node in self.nodes_out_list:
290                remote = RemoteMachineShellConnection(node)
291                remote.terminate_process(process_name=self.targetProcess)
292        except Exception, ex:
293            self.log.info(ex)
294            raise
295
296    def server_stop(self):
297        try:
298            self.log.info("server_stop")
299            for node in self.nodes_out_list:
300                remote = RemoteMachineShellConnection(node)
301                remote.stop_server()
302            self.final_events.append("start_server")
303        except Exception, ex:
304            self.log.info(ex)
305            raise
306
307    def start_server(self):
308        try:
309            self.log.info("start_server")
310            for node in self.nodes_out_list:
311                remote = RemoteMachineShellConnection(node)
312                remote.start_server()
313        except Exception, ex:
314            self.log.info(ex)
315            raise
316
317    def failover(self, queue=None):
318        failover_node = False
319        try:
320            self.log.info("VVVVVV failover a node ")
321            print "failover node   ", self.nodes_out_list
322            nodes = self.get_nodes_in_cluster_after_upgrade()
323            failover_task = self.cluster.async_failover([self.master],
324                        failover_nodes = self.nodes_out_list, graceful=self.graceful)
325            failover_task.result()
326            if self.graceful:
327                """ Check if rebalance is still running """
328                msg = "graceful failover failed for nodes"
329                self.assertTrue(RestConnection(self.master).monitorRebalance(\
330                                                     stop_if_loop=True), msg=msg)
331                rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
332                                       [], self.nodes_out_list)
333                rebalance.result()
334                failover_node = True
335            else:
336                msg = "Failed to failover a node"
337                self.assertTrue(RestConnection(self.master).monitorRebalance(\
338                                                     stop_if_loop=True), msg=msg)
339                rebalance = self.cluster.async_rebalance(nodes, [],
340                                                    self.nodes_out_list)
341                rebalance.result()
342                failover_node = True
343        except Exception, ex:
344            self.log.info(ex)
345            if queue is not None:
346                queue.put(False)
347        if failover_node and queue is not None:
348            queue.put(True)
349
350    def autofailover(self):
351        try:
352            self.log.info("autofailover")
353            autofailover_timeout = 30
354            status = RestConnection(self.master).update_autofailover_settings(True, autofailover_timeout)
355            self.assertTrue(status, 'failed to change autofailover_settings!')
356            servr_out = self.nodes_out_list
357            remote = RemoteMachineShellConnection(self.nodes_out_list[0])
358            remote.stop_server()
359            self.sleep(autofailover_timeout + 10, "Wait for autofailover")
360            rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
361                                       [], [self.nodes_out_list[0]])
362            rebalance.result()
363        except Exception, ex:
364            self.log.info(ex)
365            raise
366
367    def network_partitioning(self):
368        try:
369            self.log.info("network_partitioning")
370            for node in self.nodes_out_list:
371                self.start_firewall_on_node(node)
372            self.final_events.append("undo_network_partitioning")
373        except Exception, ex:
374            self.log.info(ex)
375            raise
376
377    def undo_network_partitioning(self):
378        try:
379            self.log.info("remove_network_partitioning")
380            for node in self.nodes_out_list:
381                self.stop_firewall_on_node(node)
382        except Exception, ex:
383            self.log.info(ex)
384            raise
385
386    def bucket_compaction(self):
387        try:
388            self.log.info("couchbase_bucket_compaction")
389            compact_tasks = []
390            for bucket in self.buckets:
391                compact_tasks.append(self.cluster.async_compact_bucket(self.master,bucket))
392        except Exception, ex:
393            self.log.info(ex)
394            raise
395
396    def warmup(self, queue=None):
397        node_warmuped = False
398        try:
399            self.log.info("Start warmup operation")
400            nodes = self.get_nodes_in_cluster_after_upgrade()
401            for server in nodes:
402                remote = RemoteMachineShellConnection(server)
403                remote.stop_server()
404                remote.start_server()
405                remote.disconnect()
406            ClusterOperationHelper.wait_for_ns_servers_or_assert(nodes, self)
407            node_warmuped = True
408        except Exception, ex:
409            self.log.info(ex)
410            if queue is not None:
411                queue.put(False)
412        if node_warmuped and queue is not None:
413            queue.put(True)
414
415    def create_lww_bucket(self):
416        self.time_synchronization='enabledWithOutDrift'
417        bucket='default'
418        print 'time_sync {0}'.format(self.time_synchronization)
419
420        helper = RestHelper(self.rest)
421        if not helper.bucket_exists(bucket):
422            node_ram_ratio = BucketOperationHelper.base_bucket_ratio(
423                self.servers)
424            info = self.rest.get_nodes_self()
425            self.rest.create_bucket(bucket=bucket,
426                ramQuotaMB=512,authType='sasl',timeSynchronization=self.time_synchronization)
427            try:
428                ready = BucketOperationHelper.wait_for_memcached(self.master,
429                    bucket)
430                self.assertTrue(ready, '', msg = '[ERROR] Expect bucket creation to not work.')
431            finally:
432                self.log.info("Success, created lww bucket")
433
434
435    def bucket_flush(self, queue=None):
436        bucket_flushed = False
437        try:
438            self.log.info("bucket_flush ops")
439            self.rest =RestConnection(self.master)
440            for bucket in self.buckets:
441                self.rest.flush_bucket(bucket.name)
442            bucket_flushed = True
443        except Exception, ex:
444            self.log.info(ex)
445            if queue is not None:
446                queue.put(False)
447        if bucket_flushed and queue is not None:
448            queue.put(True)
449
450    def delete_buckets(self, queue=None):
451        bucket_deleted = False
452        try:
453            self.log.info("delete_buckets")
454            self.rest = RestConnection(self.master)
455            for bucket in self.buckets:
456                self.log.info("delete bucket {0}".format(bucket.name))
457                self.rest.delete_bucket(bucket.name)
458            bucket_deleted = True
459        except Exception, ex:
460            self.log.info(ex)
461            if queue is not None:
462                queue.put(False)
463        if bucket_deleted and queue is not None:
464            queue.put(True)
465
466    def create_buckets(self, queue=None):
467        bucket_created = False
468        try:
469            self.log.info("create_buckets")
470            if self.dgm_run:
471                self.bucket_size = 256
472                self.default_bucket = False
473                self.sasl_buckets = 1
474                self.sasl_bucket_name = self.sasl_bucket_name + "_" \
475                                            + str(self.total_buckets)
476            self.rest = RestConnection(self.master)
477            self._bucket_creation()
478            self.sleep(5, "sleep after create bucket")
479            self.total_buckets +=1
480            bucket_created = True
481        except Exception, ex:
482            self.log.info(ex)
483            if queue is not None:
484                queue.put(False)
485        if bucket_created and queue is not None:
486            queue.put(True)
487
488    def change_bucket_properties(self):
489        try:
490            self.rest = RestConnection(self.master)
491            #Change Bucket Properties
492            for bucket in self.buckets:
493                self.rest.change_bucket_props(bucket, ramQuotaMB=None,\
494                       authType=None, saslPassword=None, replicaNumber=0,\
495                    proxyPort=None, replicaIndex=None, flushEnabled=False)
496        except Exception, ex:
497            self.log.info(ex)
498            raise
499
500    def rebalance_in(self, queue=None):
501        rebalance_in = False
502        service_in = copy.deepcopy(self.after_upgrade_services_in)
503        if service_in is None:
504            service_in = ["kv"]
505        free_nodes = self._convert_server_map(self._get_free_nodes())
506        if not free_nodes.values():
507            raise Exception("No free node available to rebalance in")
508        try:
509            self.nodes_in_list =  self.out_servers_pool.values()[:self.nodes_in]
510            if int(self.nodes_in) == 1:
511                if len(free_nodes.keys()) > 1:
512                    free_node_in = [free_nodes.values()[0]]
513                    if len(self.after_upgrade_services_in) > 1:
514                        service_in = [self.after_upgrade_services_in[0]]
515                else:
516                    free_node_in = free_nodes.values()
517                self.log.info("<<<=== rebalance_in node {0} with services {1}"\
518                                      .format(free_node_in, service_in[0]))
519                rebalance = \
520                       self.cluster.async_rebalance(self.servers[:self.nodes_init],
521                                                                      free_node_in,
522                                                         [], services = service_in)
523
524            rebalance.result()
525            self.in_servers_pool.update(free_nodes)
526            rebalance_in = True
527            if any("index" in services for services in service_in):
528                self.log.info("Set storageMode to forestdb after add "
529                         "index node {0} to cluster".format(free_nodes.keys()))
530                RestConnection(free_nodes.values()[0]).set_indexer_storage_mode()
531            if self.after_upgrade_services_in and \
532                len(self.after_upgrade_services_in) > 1:
533                self.log.info("remove service '{0}' from service list after "
534                        "rebalance done ".format(self.after_upgrade_services_in[0]))
535                self.after_upgrade_services_in.pop(0)
536            self.sleep(10, "wait 10 seconds after rebalance")
537            if free_node_in and free_node_in[0] not in self.servers:
538                self.servers.append(free_node_in[0])
539        except Exception, ex:
540            self.log.info(ex)
541            if queue is not None:
542                queue.put(False)
543        if rebalance_in and queue is not None:
544            queue.put(True)
545
546    def rebalance_out(self, queue=None):
547        rebalance_out = False
548        try:
549            self.log.info("=====>>>> rebalance_out node {0}"\
550                              .format(self.nodes_out_list))
551            rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],\
552                                                             [], self.nodes_out_list)
553            rebalance.result()
554            rebalance_out = True
555        except Exception, ex:
556            self.log.info(ex)
557            if queue is not None:
558                queue.put(False)
559        if rebalance_out and queue is not None:
560            queue.put(True)
561
562    def rebalance_in_out(self, queue=None):
563        rebalance_in_out = False
564        try:
565            self.nodes_in_list =  self.out_servers_pool.values()[:self.nodes_in]
566            self.log.info("<<<<<===== rebalance_in node {0}"\
567                                                    .format(self.nodes_in_list))
568            self.log.info("=====>>>>> rebalance_out node {0}"\
569                                                   .format(self.nodes_out_list))
570            rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],\
571                                            self.nodes_in_list, self.nodes_out_list,\
572                                           services = self.after_upgrade_services_in)
573            rebalance.result()
574            rebalance_in_out = True
575        except Exception, ex:
576            self.log.info(ex)
577            if queue is not None:
578                queue.put(False)
579        if rebalance_in_out and queue is not None:
580            queue.put(True)
581
582    def incremental_backup(self):
583        self.log.info("incremental_backup")
584
585    def full_backup(self):
586        self.log.info("full_backup")
587
588    def cb_collect_info(self):
589        try:
590            self.log.info("cb_collect_info")
591            log_file_name = "/tmp/sample.zip"
592            output, error = self.shell.execute_cbcollect_info("%s" % (log_file_name))
593        except Exception, ex:
594            raise
595        finally:
596            self.log.info(ex)
597
598    def create_index(self, queue=None):
599        self.log.info("create_index")
600        self.index_list = {}
601        create_index = False
602        self._initialize_n1ql_helper()
603        try:
604            self.n1ql_helper.create_primary_index(using_gsi = True,
605                                               server = self.n1ql_server)
606            #self.n1ql_helper.create_primary_index(using_gsi = False,
607            #                                  server = self.n1ql_server)
608            self.log.info("done create_index")
609            create_index = True
610        except Exception, e:
611            self.log.info(e)
612            if queue is not None:
613                queue.put(False)
614        if create_index and queue is not None:
615            queue.put(True)
616
617    def create_index_with_replica_and_query(self, queue=None):
618        """ ,groups=simple,reset_services=True
619        """
620        self.log.info("Create index with replica and query")
621        self.n1ql_node = self.get_nodes_from_services_map(service_type="n1ql")
622        self._initialize_n1ql_helper()
623        self.index_name_prefix = "random_index_" + str(randint(100000, 999999))
624        create_index_query = "CREATE INDEX " + self.index_name_prefix + \
625              " ON default(age) USING GSI WITH {{'num_replica': {0}}};"\
626                                        .format(self.num_index_replicas)
627        try:
628            self.create_index()
629            self.n1ql_helper.run_cbq_query(query=create_index_query,
630                                           server=self.n1ql_node)
631        except Exception, e:
632            self.log.info(e)
633        self.sleep(30)
634        index_map = self.get_index_map()
635        self.log.info(index_map)
636        if not self.expected_err_msg:
637            self.n1ql_helper.verify_replica_indexes([self.index_name_prefix],
638                                                    index_map,
639                                                    self.num_index_replicas)
640
641    def verify_index_with_replica_and_query(self, queue=None):
642        index_map = self.get_index_map()
643        try:
644            self.n1ql_helper.verify_replica_indexes([self.index_name_prefix],
645                                                    index_map,
646                                                    self.num_index_replicas)
647        except Exception, e:
648            self.log.info(e)
649            if queue is not None:
650                queue.put(False)
651
652    def create_views(self, queue=None):
653        self.log.info("*** create_views ***")
654        """ default is 1 ddoc. Change number of ddoc by param ddocs_num=new_number
655            default is 2 views. Change number of views by param
656            view_per_ddoc=new_view_per_doc """
657        try:
658            self.create_ddocs_and_views(queue)
659        except Exception, e:
660            self.log.info(e)
661
662    def query_views(self, queue=None):
663        self.log.info("*** query_views ***")
664        try:
665           self.verify_all_queries(queue)
666        except Exception, e:
667            self.log.info(e)
668
669    def drop_views(self):
670        self.log.info("drop_views")
671
672    def drop_index(self):
673        self.log.info("drop_index")
674        for bucket_name in self.index_list.keys():
675            query = "drop index {0} on {1} using gsi"\
676                 .format(self.index_list[bucket_name], bucket_name)
677            self.n1ql_helper.run_cbq_query(query, self.n1ql_server)
678
679    def query_explain(self):
680        self.log.info("query_explain")
681        for bucket in self.buckets:
682            query = "select count(*) from {0}".format(bucket.name)
683            self.n1ql_helper.run_cbq_query(query, self.n1ql_server)
684            query = "explain select count(*) from {0}".format(bucket.name)
685            self.n1ql_helper.run_cbq_query(query, self.n1ql_server)
686            query = "select count(*) from {0} where field_1 = 1".format(bucket.name)
687            self.n1ql_helper.run_cbq_query(query, self.n1ql_server)
688            query = "explain select count(*) from {0} where field_1 = 1".format(bucket.name)
689            self.n1ql_helper.run_cbq_query(query, self.n1ql_server)
690
691    def change_settings(self):
692        try:
693            status = True
694            if "update_notifications" in self.input.test_params:
695                status &= self.rest.update_notifications(str(self.input.param("update_notifications", 'true')).lower())
696            if "autofailover_timeout" in self.input.test_params:
697                status &= self.rest.update_autofailover_settings(True, self.input.param("autofailover_timeout", None))
698            if "autofailover_alerts" in self.input.test_params:
699                status &= self.rest.set_alerts_settings('couchbase@localhost', 'root@localhost', 'user', 'pwd')
700            if "autocompaction" in self.input.test_params:
701                tmp, _, _ = self.rest.set_auto_compaction(viewFragmntThresholdPercentage=
702                                         self.input.param("autocompaction", 50))
703                status &= tmp
704            if not status:
705                self.fail("some settings were not set correctly!")
706        except Exception, ex:
707            self.log.info(ex)
708            raise
709
710    def create_eventing_services(self, queue=None):
711        """ Only work after cluster upgrade to 5.5.0 completely """
712        try:
713            rest = RestConnection(self.master)
714            cb_version = rest.get_nodes_version()
715            if 5.5 > float(cb_version[:3]):
716                self.log.info("This eventing test is only for cb version 5.5 and later.")
717                return
718
719            bucket_params = self._create_bucket_params(server=self.master, size=128,
720                                                       replicas=self.num_replicas)
721            self.cluster.create_standard_bucket(name=self.src_bucket_name, port=STANDARD_BUCKET_PORT + 1,
722                                                bucket_params=bucket_params)
723            self.buckets = RestConnection(self.master).get_buckets()
724            self.src_bucket = RestConnection(self.master).get_buckets()
725            self.cluster.create_standard_bucket(name=self.dst_bucket_name, port=STANDARD_BUCKET_PORT + 1,
726                                                bucket_params=bucket_params)
727            self.cluster.create_standard_bucket(name=self.metadata_bucket_name, port=STANDARD_BUCKET_PORT + 1,
728                                                bucket_params=bucket_params)
729            self.buckets = RestConnection(self.master).get_buckets()
730            self.gens_load = self.generate_docs(self.docs_per_day)
731            self.expiry = 3
732
733            self.restServer = self.get_nodes_from_services_map(service_type="eventing")
734            """ must be self.rest to pass in deploy_function"""
735            self.rest = RestConnection(self.restServer)
736            self.load(self.gens_load, buckets=self.buckets, flag=self.item_flag, verify_data=False,
737                  batch_size=self.batch_size)
738            function_name = "Function_{0}_{1}".format(randint(1, 1000000000), self._testMethodName)
739            self.function_name = function_name[0:90]
740            body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_ON_UPDATE, worker_count=3)
741            bk_events_created = False
742            rs_events_created = False
743            try:
744                self.deploy_function(body)
745                bk_events_created = True
746                self.verify_eventing_results(self.function_name, 0, skip_stats_validation=True)
747            except Exception as e:
748                self.log.error(e)
749            finally:
750                self.undeploy_and_delete_function(body)
751        except Exception, e:
752            self.log.info(e)
753
754    def create_cbas_services(self, queue=None):
755        """
756           This test only need max 4 servers to run and only upgrade to vulcan and later
757           Command to run:
758            upgrade.upgrade_tests.UpgradeTests.test_upgrade,items=5000,initial_version=4.6.4-xxxx,
759            nodes_init=3,initialize_events=kv_ops_initialize,upgrade_services_in='kv:index',
760            after_events=rebalance_in-create_cbas_services,after_upgrade_services_in=cbas,
761            dgm_run=true,upgrade_test=True,skip_init_check_cbserver=true,released_upgrade_version=5.5.0-xxx
762        """
763        try:
764            self.validate_error = False
765            rest = RestConnection(self.master)
766            cb_version = rest.get_nodes_version()
767            if 5.5 > float(cb_version[:3]):
768                self.log.info("This analytic test is only for cb version 5.5 and later.")
769                return
770            self.log.info("Get cbas nodes in cluster")
771            cbas_node = self.get_nodes_from_services_map(service_type="cbas")
772            cbas_rest = RestConnection(self.servers[self.nodes_init])
773            self.get_services_map()
774
775            kv_nodes = copy.deepcopy(self.servers)
776            kv_maps = [x.replace(":8091", "") for x in self.services_map["kv"]]
777            self.log.info("Get kv node in cluster")
778            for i, node in enumerate(kv_nodes):
779                if node.ip not in kv_maps:
780                    del kv_nodes[i]
781            self.cbas_node = cbas_node
782            self.load_sample_buckets(servers=kv_nodes, bucketName="travel-sample",
783                                              total_items=31591, rest=cbas_rest)
784            self.test_create_dataset_on_bucket()
785        except Exception as e:
786            self.log.info(e)
787            if queue is not None:
788                queue.put(False)
789        if queue is not None:
790            queue.put(True)
791
792    def online_upgrade(self):
793        try:
794            self.log.info("online_upgrade")
795            self.initial_version = self.upgrade_versions[0]
796            self.sleep(self.sleep_time, "Pre-setup of old version is done. "
797                                    "Wait for online upgrade to {0} version"\
798                                               .format(self.initial_version))
799            self.product = 'couchbase-server'
800            if self.online_upgrade_type == "swap":
801                self.online_upgrade_swap_rebalance()
802            else:
803                self.online_upgrade_incremental()
804        except Exception, ex:
805            self.log.info(ex)
806            raise
807
808    def online_upgrade_swap_rebalance(self):
809        self.log.info("online_upgrade_swap_rebalance")
810        self.swap_num_servers = self.input.param('swap_num_servers', 1)
811        servers = self._convert_server_map(self.servers[:self.nodes_init])
812        out_servers = self._convert_server_map(self.servers[self.nodes_init:])
813        self.swap_num_servers = min(self.swap_num_servers, len(out_servers))
814        start_services_num = 0
815        for i in range(self.nodes_init / self.swap_num_servers):
816            servers_in = {}
817            new_servers = copy.deepcopy(servers)
818            servicesNodeOut = ""
819            for key in out_servers.keys():
820                servers_in[key] = out_servers[key]
821                out_servers[key].upgraded = True
822                out_servers.pop(key)
823                if len(servers_in) == self.swap_num_servers:
824                    break
825            servers_out = {}
826            node_out = None
827            new_servers.update(servers_in)
828            for key in servers.keys():
829                if len(servers_out) == self.swap_num_servers:
830                    break
831                elif not servers[key].upgraded:
832                    servers_out[key] = servers[key]
833                    new_servers.pop(key)
834            out_servers.update(servers_out)
835            rest = RestConnection(servers.values()[0])
836            self.log.info("****************************************".format(servers))
837            self.log.info("cluster nodes = {0}".format(servers.values()))
838            self.log.info("cluster service map = {0}".format(rest.get_nodes_services()))
839            self.log.info("cluster version map = {0}".format(rest.get_nodes_version()))
840            self.log.info("to include in cluster = {0}".format(servers_in.values()))
841            self.log.info("to exclude from cluster = {0}".format(servers_out.values()))
842            self.log.info("****************************************".format(servers))
843            rest = RestConnection(servers_out.values()[0])
844            servicesNodeOut = rest.get_nodes_services()
845            servicesNodeOut = ",".join(servicesNodeOut[servers_out.keys()[0]] )
846            self._install(servers_in.values())
847            self.sleep(10, "Wait for ns server is ready")
848            old_vbucket_map = self._record_vbuckets(self.master, servers.values())
849            try:
850                if self.upgrade_services_in == "same":
851                    self.cluster.rebalance(servers.values(), servers_in.values(),\
852                                                             servers_out.values(),
853                                        services=[servicesNodeOut])
854                elif self.upgrade_services_in != None and len(self.upgrade_services_in) > 0:
855                    self.cluster.rebalance(servers.values(),
856                                        servers_in.values(),
857                                        servers_out.values(),
858                                        services = \
859                       self.upgrade_services_in[start_services_num:start_services_num+\
860                                                             len(servers_in.values())])
861                    start_services_num += len(servers_in.values())
862                else:
863                    self.cluster.rebalance(servers.values(), servers_in.values(),\
864                                                             servers_out.values())
865            except Exception, ex:
866                self.log.info(ex)
867                raise
868            self.out_servers_pool = servers_out
869            self.in_servers_pool = new_servers
870            servers = new_servers
871            self.servers = servers.values()
872            self.master = self.servers[0]
873            if self.verify_vbucket_info:
874                new_vbucket_map = self._record_vbuckets(self.master, self.servers)
875                self._verify_vbuckets(old_vbucket_map, new_vbucket_map)
876            # in the middle of online upgrade events
877            if self.in_between_events:
878                self.event_threads = []
879                self.event_threads += self.run_event(self.in_between_events)
880                self.finish_events(self.event_threads)
881                self.in_between_events = None
882
883    def online_upgrade_incremental(self):
884        self.log.info("online_upgrade_incremental")
885        try:
886            for server in self.servers[1:]:
887                self.cluster.rebalance(self.servers, [], [server])
888                self.initial_version = self.upgrade_versions[0]
889                self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version".\
890                       format(self.initial_version))
891                self.product = 'couchbase-server'
892                self._install([server])
893                self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance")
894                self.cluster.rebalance(self.servers, [server], [])
895                self.log.info("Rebalanced in upgraded nodes")
896                self.sleep(self.sleep_time)
897            self._new_master(self.servers[1])
898            self.cluster.rebalance(self.servers, [], [self.servers[0]])
899            self.log.info("Rebalanced out all old version nodes")
900        except Exception, ex:
901            self.log.info(ex)
902            raise
903
904    def offline_upgrade(self):
905        self._offline_upgrade()
906
907    def failover_add_back(self):
908        try:
909            rest = RestConnection(self.master)
910            recoveryType = self.input.param("recoveryType", "full")
911            servr_out = self.nodes_out_list
912            failover_task =self.cluster.async_failover([self.master],
913                    failover_nodes = servr_out, graceful=self.graceful)
914            failover_task.result()
915            nodes_all = rest.node_statuses()
916            nodes = []
917            if servr_out[0].ip == "127.0.0.1":
918                for failover_node in servr_out:
919                    nodes.extend([node for node in nodes_all
920                        if (str(node.port) == failover_node.port)])
921            else:
922                for failover_node in servr_out:
923                    nodes.extend([node for node in nodes_all
924                        if node.ip == failover_node.ip])
925            for node in nodes:
926                self.log.info(node)
927                rest.add_back_node(node.id)
928                rest.set_recovery_type(otpNode=node.id, recoveryType=recoveryType)
929            rebalance = self.cluster.async_rebalance(self.servers[:self.nodes_init],
930                                                                             [], [])
931            rebalance.result()
932        except Exception, ex:
933            raise
934
935    def kv_ops_initialize(self, queue=None):
936        try:
937            self.log.info("kv_ops_initialize")
938            self._load_all_buckets(self.master, self.gen_initial_create, "create",\
939                                             self.expire_time, flag=self.item_flag)
940            self.log.info("done kv_ops_initialize")
941        except Exception, ex:
942            self.log.info(ex)
943            if queue is not None:
944                queue.put(False)
945            raise
946        if queue is not None:
947            queue.put(True)
948
949    def kv_after_ops_create(self, queue=None):
950        try:
951            self.log.info("kv_after_ops_create")
952            self._load_all_buckets(self.master, self.after_gen_create, "create",\
953                                           self.expire_time, flag=self.item_flag)
954            for bucket in self.buckets:
955                self.log.info(" record vbucket for the bucket {0}"\
956                                              .format(bucket.name))
957                curr_items = \
958                    RestConnection(self.master).get_active_key_count(bucket.name)
959                self.log.info("{0} curr_items in bucket {1} "\
960                              .format(curr_items, bucket.name))
961        except Exception, ex:
962            self.log.info(ex)
963            if queue is not None:
964                queue.put(False)
965        if queue is not None:
966            queue.put(True)
967
968    def kv_after_ops_update(self):
969        try:
970            self.log.info("kv_after_ops_update")
971            self._load_all_buckets(self.master, self.after_gen_update, "update",
972                                          self.expire_time, flag=self.item_flag)
973        except Exception, ex:
974            self.log.info(ex)
975            raise
976
977    def kv_after_ops_delete(self):
978        try:
979            self.log.info("kv_after_ops_delete")
980            self._load_all_buckets(self.master, self.after_gen_delete, "delete",
981                                          self.expire_time, flag=self.item_flag)
982        except Exception, ex:
983            self.log.info(ex)
984            raise
985
986    def doc_ops_initialize(self, queue=None):
987        try:
988            self.log.info("load doc to all buckets")
989            self._load_doc_data_all_buckets(data_op="create", batch_size=1000,
990                                                                gen_load=None)
991            self.log.info("done initialize load doc to all buckets")
992        except Exception, ex:
993            self.log.info(ex)
994            if queue is not None:
995                queue.put(False)
996        if queue is not None:
997            queue.put(True)
998
999    def kv_ops_create(self):
1000        try:
1001            self.log.info("kv_ops_create")
1002            self._load_all_buckets(self.master, self.gen_create, "create",
1003                                    self.expire_time, flag=self.item_flag)
1004        except Exception, ex:
1005            self.log.info(ex)
1006            raise
1007
1008    def kv_ops_update(self):
1009        try:
1010            self.log.info("kv_ops_update")
1011            self._load_all_buckets(self.master, self.gen_update, "update",
1012                                    self.expire_time, flag=self.item_flag)
1013        except Exception, ex:
1014            self.log.info(ex)
1015            raise
1016
1017    def kv_ops_delete(self):
1018        try:
1019            self.log.info("kv_ops_delete")
1020            self._load_all_buckets(self.master, self.gen_delete, "delete",
1021                                    self.expire_time, flag=self.item_flag)
1022        except Exception, ex:
1023            self.log.info(ex)
1024            raise
1025
1026    def add_sub_doc(self):
1027        try:
1028            self.log.info("add sub doc")
1029            """add sub doc code here"""
1030        except Exception, ex:
1031            self.log.info(ex)
1032            raise
1033
1034
1035    def create_fts_index(self, queue=None):
1036        try:
1037            self.log.info("Checking if index already exists ...")
1038            name = "default"
1039            """ test on one bucket """
1040            for bucket in self.buckets:
1041                name = bucket.name
1042                break
1043            SOURCE_CB_PARAMS = {
1044                      "authUser": "default",
1045                      "authPassword": "",
1046                      "authSaslUser": "",
1047                      "authSaslPassword": "",
1048                      "clusterManagerBackoffFactor": 0,
1049                      "clusterManagerSleepInitMS": 0,
1050                      "clusterManagerSleepMaxMS": 20000,
1051                      "dataManagerBackoffFactor": 0,
1052                      "dataManagerSleepInitMS": 0,
1053                      "dataManagerSleepMaxMS": 20000,
1054                      "feedBufferSizeBytes": 0,
1055                      "feedBufferAckThreshold": 0
1056                       }
1057            self.index_type = 'fulltext-index'
1058            self.index_definition = {
1059                          "type": "fulltext-index",
1060                          "name": "",
1061                          "uuid": "",
1062                          "params": {},
1063                          "sourceType": "couchbase",
1064                          "sourceName": "",
1065                          "sourceUUID": "",
1066                          "sourceParams": SOURCE_CB_PARAMS,
1067                          "planParams": {}
1068                          }
1069            self.name = self.index_definition['name'] = \
1070                                self.index_definition['sourceName'] = name
1071            fts_node = self.get_nodes_from_services_map("fts", \
1072                                servers=self.get_nodes_in_cluster_after_upgrade())
1073            if fts_node:
1074                rest = RestConnection(fts_node)
1075                status, _ = rest.get_fts_index_definition(self.name)
1076                if status != 400:
1077                    rest.delete_fts_index(self.name)
1078                self.log.info("Creating {0} {1} on {2}".format(self.index_type,
1079                                                           self.name, rest.ip))
1080                rest.create_fts_index(self.name, self.index_definition)
1081            else:
1082                raise("No FTS node in cluster")
1083            self.ops_dist_map = self.calculate_data_change_distribution(
1084                create_per=self.create_ops_per , update_per=self.update_ops_per ,
1085                delete_per=self.delete_ops_per, expiry_per=self.expiry_ops_per,
1086                start=0, end=self.docs_per_day)
1087            self.log.info(self.ops_dist_map)
1088            self.dataset = "default"
1089            self.docs_gen_map = self.generate_ops_docs(self.docs_per_day, 0)
1090            self.async_ops_all_buckets(self.docs_gen_map, batch_size=100)
1091        except Exception, ex:
1092            self.log.info(ex)
1093
1094    def create_fts_index_query(self, queue=None):
1095        try:
1096            self.fts_obj = self.create_fts_index_query_compare()
1097            return self.fts_obj
1098        except Exception, ex:
1099            self.log.info(ex)
1100            if queue is not None:
1101                queue.put(False)
1102        if queue is not None:
1103            queue.put(True)
1104
1105    def cluster_stats(self, servers):
1106        self._wait_for_stats_all_buckets(servers)
1107
1108    def _initialize_n1ql_helper(self):
1109        if self.n1ql_helper == None:
1110            self.n1ql_server = self.get_nodes_from_services_map(service_type = \
1111                                              "n1ql",servers=self.input.servers)
1112            self.n1ql_helper = N1QLHelper(version = "sherlock", shell = None,
1113                use_rest = True, max_verify = self.max_verify,
1114                buckets = self.buckets, item_flag = None,
1115                n1ql_port = self.n1ql_server.n1ql_port, full_docs_list = [],
1116                log = self.log, input = self.input, master = self.master)
1117
1118    def _get_free_nodes(self):
1119        self.log.info("Get free nodes in pool not in cluster yet")
1120        nodes = self.get_nodes_in_cluster_after_upgrade()
1121        free_nodes = copy.deepcopy(self.input.servers)
1122        found = False
1123        for node in nodes:
1124            for server in free_nodes:
1125                if str(server.ip).strip() == str(node.ip).strip():
1126                    self.log.info("this node {0} is in cluster".format(server))
1127                    free_nodes.remove(server)
1128                    found = True
1129        if not free_nodes:
1130            self.log.info("no free node")
1131            return free_nodes
1132        else:
1133            self.log.info("here is the list of free nodes {0}"\
1134                                       .format(free_nodes))
1135            return free_nodes
1136
1137    def get_nodes_in_cluster_after_upgrade(self, master_node=None):
1138        rest = None
1139        if master_node == None:
1140            rest = RestConnection(self.master)
1141        else:
1142            rest = RestConnection(master_node)
1143        nodes = rest.node_statuses()
1144        server_set = []
1145        for node in nodes:
1146            for server in self.input.servers:
1147                if server.ip == node.ip:
1148                    server_set.append(server)
1149        return server_set
1150