1import Queue
2import copy, json
3from newupgradebasetest import NewUpgradeBaseTest
4from remote.remote_util import RemoteMachineShellConnection, RemoteUtilHelper
5from couchbase_helper.documentgenerator import BlobGenerator
6from membase.api.rest_client import RestConnection, RestHelper
7from membase.api.exception import RebalanceFailedException
8from membase.helper.cluster_helper import ClusterOperationHelper
9from memcached.helper.kvstore import KVStore
10from fts.stable_topology_fts import StableTopFTS
11from pytests.fts.fts_callable import FTSCallable
12from cbas.cbas_functional_tests import CBASFunctionalTests
13# from 2i.indexscans_2i import SecondaryIndexingScanTests
14from testconstants import COUCHBASE_VERSION_2
15from testconstants import COUCHBASE_VERSION_3, COUCHBASE_FROM_VERSION_3
16from testconstants import SHERLOCK_VERSION, COUCHBASE_FROM_SHERLOCK,\
17                          COUCHBASE_FROM_SPOCK, COUCHBASE_FROM_WATSON,\
18                          COUCHBASE_FROM_VULCAN
19from couchbase.cluster import Cluster, PasswordAuthenticator
20from couchbase.exceptions import CouchbaseError,CouchbaseNetworkError,CouchbaseTransientError
21from security.rbac_base import RbacBase
22
23class SingleNodeUpgradeTests(NewUpgradeBaseTest):
24    def setUp(self):
25        super(SingleNodeUpgradeTests, self).setUp()
26        self.queue = Queue.Queue()
27
28    def tearDown(self):
29        super(SingleNodeUpgradeTests, self).tearDown()
30        if self.input.param("op", None) == "close_port":
31            remote = RemoteMachineShellConnection(self.master)
32            remote.disable_firewall()
33
34    def test_upgrade(self):
35        self._install([self.master])
36        self.operations([self.master])
37        for upgrade_version in self.upgrade_versions:
38            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
39                       format(upgrade_version))
40            upgrade_threads = self._async_update(upgrade_version, [self.master])
41            # wait upgrade statuses
42            for upgrade_thread in upgrade_threads:
43                upgrade_thread.join()
44            success_upgrade = True
45            while not self.queue.empty():
46                success_upgrade &= self.queue.get()
47            if not success_upgrade:
48                self.fail("Upgrade failed!")
49            self.add_built_in_server_user(node=self.master)
50            self.sleep(self.expire_time)
51            #            if not self.is_linux:
52            #                self.wait_node_restarted(self.master, wait_time=1200, wait_if_warmup=True, check_service=True)
53            remote = RemoteMachineShellConnection(self.master)
54            for bucket in self.buckets:
55                remote.execute_cbepctl(bucket, "", "set flush_param", "exp_pager_stime", 5)
56            remote.disconnect()
57            self.sleep(30)
58            self.verification([self.master])
59
60    def test_upgrade_negative(self):
61        op = self.input.param("op", None)
62        error = self.input.param("error", '')
63        remote = RemoteMachineShellConnection(self.master)
64        if op is None:
65            self.fail("operation should be specified")
66        if op == "higher_version":
67            tmp = self.initial_version
68            self.initial_version = self.upgrade_versions[0]
69            self.upgrade_versions = [tmp, ]
70        info = None
71        if op == "wrong_arch":
72            info = remote.extract_remote_info()
73            info.architecture_type = ('x86_64', 'x86')[info.architecture_type == 'x86']
74        self._install([self.master])
75        self.operations([self.master])
76        try:
77            if op == "close_port":
78                RemoteUtilHelper.enable_firewall(self.master)
79            for upgrade_version in self.upgrade_versions:
80                self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
81                           format(upgrade_version))
82                output, error = self._upgrade(upgrade_version, self.master, info=info)
83                if str(output).find(error) != -1 or str(error).find(error) != -1:
84                    raise Exception(error)
85        except Exception, ex:
86            self.log.info("Exception %s appeared as expected" % ex)
87            self.log.info("Check that old version is working fine")
88            self.verification([self.master])
89        else:
90            self.fail("Upgrade should fail!")
91        remote.disconnect()
92
93
94class Upgrade_Utils(NewUpgradeBaseTest):
95    def setUp(self):
96        super(Upgrade_Utils, self).setUp()
97        self.nodes_init = self.input.param('nodes_init', 2)
98        self.queue = Queue.Queue()
99
100    def tearDown(self):
101        print "teardown done"
102
103    def add_and_rebalance(self):
104        self._install(self.servers[:self.nodes_init])
105        self.operations(self.servers[:self.nodes_init])
106        self.initial_version = self.upgrade_versions[0]
107        self.product = 'couchbase-server'
108        self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
109                   format(self.initial_version))
110        self._install(self.servers[self.nodes_init:self.num_servers])
111
112
113class MultiNodesUpgradeTests(NewUpgradeBaseTest):
114    def setUp(self):
115        super(MultiNodesUpgradeTests, self).setUp()
116        self.nodes_init = self.input.param('nodes_init', 2)
117        self.queue = Queue.Queue()
118        self.rate_limit = self.input.param("rate_limit", 100000)
119        self.batch_size = self.input.param("batch_size", 1000)
120        self.doc_size = self.input.param("doc_size", 100)
121        self.loader = self.input.param("loader", "high_doc_ops")
122        self.instances = self.input.param("instances", 4)
123        self.threads = self.input.param("threads", 5)
124        self.use_replica_to = self.input.param("use_replica_to",False)
125
126    def tearDown(self):
127        super(MultiNodesUpgradeTests, self).tearDown()
128
129    def offline_cluster_upgrade(self):
130        self._install(self.servers[:self.nodes_init])
131        self.operations(self.servers[:self.nodes_init])
132        seqno_expected = 1
133        if self.ddocs_num:
134            self.create_ddocs_and_views()
135            if self.input.param('run_index', False):
136                self.verify_all_queries()
137        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
138            self.check_seqno(seqno_expected)
139        if self.during_ops:
140            for opn in self.during_ops:
141                if opn != 'add_back_failover':
142                    getattr(self, opn)()
143        num_stoped_nodes = self.input.param('num_stoped_nodes', self.nodes_init)
144        upgrade_nodes = self.servers[self.nodes_init - num_stoped_nodes:self.nodes_init]
145        for upgrade_version in self.upgrade_versions:
146            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
147                       format(upgrade_version))
148            for server in upgrade_nodes:
149                remote = RemoteMachineShellConnection(server)
150                remote.stop_server()
151                self.sleep(self.sleep_time)
152                if self.wait_expire:
153                    self.sleep(self.expire_time)
154                if self.input.param('remove_manifest_files', False):
155                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
156                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
157                        remote.log_command_output(output, error)
158                if self.input.param('remove_config_files', False):
159                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
160                        output, error = remote.execute_command(
161                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
162                        remote.log_command_output(output, error)
163                    self.buckets = []
164                remote.disconnect()
165            upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
166            # wait upgrade statuses
167            for upgrade_thread in upgrade_threads:
168                upgrade_thread.join()
169            success_upgrade = True
170            while not self.queue.empty():
171                success_upgrade &= self.queue.get()
172            if not success_upgrade:
173                self.fail("Upgrade failed. See logs above!")
174            self.add_built_in_server_user()
175            self.sleep(self.expire_time)
176            if self.during_ops:
177                if "add_back_failover" in self.during_ops:
178                    getattr(self, 'add_back_failover')()
179                    self.cluster.rebalance(self.servers[:self.nodes_init], [], [])
180                elif "failover" in self.during_ops:
181                    self.cluster.rebalance(self.servers[:self.nodes_init], [], [])
182                    rem = [server for server in self.servers[:self.nodes_init]
183                           if self.failover_node.ip == server.ip and str(self.failover_node.port) == server.port]
184                    self.dcp_rebalance_in_offline_upgrade_from_version2()
185                    self.verification(list(set(self.servers[:self.nodes_init]) - set(rem)))
186                    return
187            self.dcp_rebalance_in_offline_upgrade_from_version2()
188            self._create_ephemeral_buckets()
189            self.verification(self.servers[:self.nodes_init])
190            if self.input.param('check_seqno', True):
191                self.check_seqno(seqno_expected)
192
193    def offline_cluster_upgrade_and_reboot(self):
194        self._install(self.servers[:self.nodes_init])
195        self.operations(self.servers[:self.nodes_init])
196        if self.ddocs_num:
197            self.create_ddocs_and_views()
198        if self.during_ops:
199            for opn in self.during_ops:
200                getattr(self, opn)()
201        num_stoped_nodes = self.input.param('num_stoped_nodes', self.nodes_init)
202        stoped_nodes = self.servers[self.nodes_init - num_stoped_nodes:self.nodes_init]
203        for upgrade_version in self.upgrade_versions:
204            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
205                       format(upgrade_version))
206            for server in stoped_nodes:
207                remote = RemoteMachineShellConnection(server)
208                remote.stop_server()
209                remote.disconnect()
210            self.sleep(self.sleep_time)
211            upgrade_threads = self._async_update(upgrade_version, stoped_nodes)
212            for upgrade_thread in upgrade_threads:
213                upgrade_thread.join()
214            success_upgrade = True
215            while not self.queue.empty():
216                success_upgrade &= self.queue.get()
217            if not success_upgrade:
218                self.fail("Upgrade failed!")
219            self.dcp_rebalance_in_offline_upgrade_from_version2()
220            self.add_built_in_server_user()
221            for server in stoped_nodes:
222                remote = RemoteMachineShellConnection(server)
223                remote.stop_server()
224                self.sleep(5)
225                remote.start_couchbase()
226                remote.disconnect()
227            ClusterOperationHelper.wait_for_ns_servers_or_assert(stoped_nodes, self)
228            self.log.info("Need to sleep 15 seconds for couchbase server startup completely")
229            self.sleep(15)
230            self._create_ephemeral_buckets()
231            self.verification(self.servers[:self.nodes_init])
232
233    def offline_cluster_upgrade_and_rebalance(self):
234        num_stoped_nodes = self.input.param('num_stoped_nodes', self.nodes_init)
235        stoped_nodes = self.servers[self.nodes_init - num_stoped_nodes:self.nodes_init]
236        servs_out = self.servers[self.nodes_init - num_stoped_nodes - self.nodes_out:self.nodes_init - num_stoped_nodes]
237        servs_in = self.servers[self.nodes_init:self.nodes_init + self.nodes_in]
238        self._install(self.servers)
239        self.operations(self.servers[:self.nodes_init])
240        if self.ddocs_num:
241            self.create_ddocs_and_views()
242        if self.during_ops:
243            for opn in self.during_ops:
244                getattr(self, opn)()
245        for upgrade_version in self.upgrade_versions:
246            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
247                       format(upgrade_version))
248            for server in stoped_nodes:
249                remote = RemoteMachineShellConnection(server)
250                remote.stop_server()
251                remote.disconnect()
252            upgrade_threads = self._async_update(upgrade_version, stoped_nodes)
253            try:
254                self.cluster.rebalance(self.servers[:self.nodes_init], servs_in, servs_out)
255            except RebalanceFailedException:
256                self.log.info("rebalance failed as expected")
257            for upgrade_thread in upgrade_threads:
258                upgrade_thread.join()
259            success_upgrade = True
260            while not self.queue.empty():
261                success_upgrade &= self.queue.get()
262            if not success_upgrade:
263                self.fail("Upgrade failed!")
264            ClusterOperationHelper.wait_for_ns_servers_or_assert(stoped_nodes, self)
265            self.cluster.rebalance(self.servers[:self.nodes_init], [], servs_out)
266            self.dcp_rebalance_in_offline_upgrade_from_version2()
267            self._create_ephemeral_buckets()
268            self.verification(list(set(self.servers[:self.nodes_init] + servs_in) - set(servs_out)))
269
270    def offline_cluster_upgrade_non_default_path(self):
271        try:
272            num_nodes_with_not_default = self.input.param('num_nodes_with_not_default', 1)
273            prefix_path = ''
274            if not self.is_linux:
275                prefix_path = "C:"
276            data_path = prefix_path + self.input.param('data_path', '/tmp/data').replace('|', "/")
277            index_path = self.input.param('index_path', data_path).replace('|', "/")
278            if not self.is_linux and not index_path.startswith("C:"):
279                index_path = prefix_path + index_path
280            num_nodes_remove_data = self.input.param('num_nodes_remove_data', 0)
281            servers_with_not_default = self.servers[:num_nodes_with_not_default]
282            old_paths = {}
283            for server in servers_with_not_default:
284                # to restore servers paths in finally
285                old_paths[server.ip] = [server.data_path, server.index_path]
286                server.data_path = data_path
287                server.index_path = index_path
288                shell = RemoteMachineShellConnection(server)
289                # shell.remove_folders([data_path, index_path])
290                for path in set([data_path, index_path]):
291                    shell.create_directory(path)
292                shell.disconnect()
293            self._install(self.servers[:self.nodes_init])
294            self.operations(self.servers[:self.nodes_init])
295            if self.ddocs_num and not self.input.param('extra_verification', False):
296                self.create_ddocs_and_views()
297            for upgrade_version in self.upgrade_versions:
298                self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
299                           format(upgrade_version))
300                for server in self.servers[:self.nodes_init]:
301                    remote = RemoteMachineShellConnection(server)
302                    remote.stop_server()
303                    remote.disconnect()
304                self.sleep(self.sleep_time)
305                # remove data for nodes with non default data paths
306                tmp = min(num_nodes_with_not_default, num_nodes_remove_data)
307                self.delete_data(self.servers[:tmp], [data_path + "/*", index_path + "/*"])
308                # remove data for nodes with default data paths
309                self.delete_data(self.servers[tmp: max(tmp, num_nodes_remove_data)],
310                                 ["/opt/couchbase/var/lib/couchbase/data/*"])
311                upgrade_threads = self._async_update(upgrade_version, self.servers[:self.nodes_init])
312                for upgrade_thread in upgrade_threads:
313                    upgrade_thread.join()
314                success_upgrade = True
315                while not self.queue.empty():
316                    success_upgrade &= self.queue.get()
317                if not success_upgrade:
318                    self.fail("Upgrade failed!")
319                self.dcp_rebalance_in_offline_upgrade_from_version2()
320                self.add_built_in_server_user()
321                self.sleep(self.expire_time)
322                for server in servers_with_not_default:
323                    rest = RestConnection(server)
324                    node = rest.get_nodes_self()
325                    print node.storage[0]
326                    self.assertEqual(node.storage[0].path.lower(), data_path.lower(),
327                                     "Server %s. Data path expected:%s, actual %s." % (
328                                         server.ip, data_path, node.storage[0].path))
329                    self.assertEqual(node.storage[0].index_path.lower(), index_path.lower(),
330                                     "Server %s. Index path expected: %s, actual: %s." % (
331                                         server.ip, index_path, node.storage[0].index_path))
332            if num_nodes_remove_data:
333                for bucket in self.buckets:
334                    if self.rest_helper.bucket_exists(bucket):
335                        raise Exception("bucket: %s still exists" % bucket.name)
336                self.buckets = []
337
338            if self.input.param('extra_verification', False):
339                self.bucket_size = 100
340                self._create_sasl_buckets(self.master, 1)
341                self._create_standard_buckets(self.master, 1)
342                if self.ddocs_num:
343                    self.create_ddocs_and_views()
344                    gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
345                    self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
346            self._create_ephemeral_buckets()
347            self.verification(self.servers[:self.nodes_init], check_items=not num_nodes_remove_data)
348        finally:
349            for server in servers_with_not_default:
350                server.data_path = old_paths[server.ip][0]
351                server.index_path = old_paths[server.ip][1]
352
353    def offline_cluster_upgrade_with_reinstall(self):
354        self._install(self.servers[:self.nodes_init])
355        self.operations(self.servers[:self.nodes_init])
356        if self.ddocs_num:
357            self.create_ddocs_and_views()
358        if self.during_ops:
359            for opn in self.during_ops:
360                getattr(self, opn)()
361        num_nodes_reinstall = self.input.param('num_nodes_reinstall', 1)
362        stoped_nodes = self.servers[self.nodes_init - (self.nodes_init - num_nodes_reinstall):self.nodes_init]
363        nodes_reinstall = self.servers[:num_nodes_reinstall]
364        for upgrade_version in self.upgrade_versions:
365            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
366                       format(upgrade_version))
367            for server in stoped_nodes:
368                remote = RemoteMachineShellConnection(server)
369                remote.stop_server()
370                remote.disconnect()
371            self.sleep(self.sleep_time)
372            upgrade_threads = self._async_update(upgrade_version, stoped_nodes)
373            self.force_reinstall(nodes_reinstall)
374            for upgrade_thread in upgrade_threads:
375                upgrade_thread.join()
376            success_upgrade = True
377            while not self.queue.empty():
378                success_upgrade &= self.queue.get()
379            if not success_upgrade:
380                self.fail("Upgrade failed!")
381            self.dcp_rebalance_in_offline_upgrade_from_version2()
382            self._create_ephemeral_buckets()
383            self.verification(self.servers[:self.nodes_init])
384
385    def online_upgrade_rebalance_in_with_ops(self):
386        gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
387
388        def modify_data():
389            tasks_ops = []
390            for bucket in self.buckets:
391                gen = copy.deepcopy(gen_load)
392                tasks_ops.append(self.cluster.async_load_gen_docs(self.master, bucket.name, gen,
393                                                                  bucket.kvs[1], "create"))
394            for task in tasks_ops:
395                task.result()
396
397        self._install(self.servers[:self.nodes_init])
398        self.operations(self.servers[:self.nodes_init])
399        if self.ddocs_num:
400            self.create_ddocs_and_views()
401        self.initial_version = self.upgrade_versions[0]
402        self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
403                   format(self.initial_version))
404        self.product = 'couchbase-server'
405        servs_in = self.servers[self.nodes_init:self.nodes_in + self.nodes_init]
406        servs_out = self.servers[self.nodes_init - self.nodes_out:self.nodes_init]
407
408        if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
409            self._install(self.servers[self.nodes_init:self.num_servers], community_to_enterprise=True)
410        else:
411            self._install(self.servers[self.nodes_init:self.num_servers])
412        self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance")
413
414        modify_data()
415        task_reb = self.cluster.async_rebalance(self.servers[:self.nodes_init], servs_in, servs_out)
416        self.add_built_in_server_user()
417        while task_reb.state != "FINISHED":
418            modify_data()
419        task_reb.result()
420        self._create_ephemeral_buckets()
421        self.verification(list((set(self.servers[:self.nodes_init]) | set(servs_in)) - set(servs_out)))
422
423    def online_upgrade_rebalance_in_out(self):
424        self._install(self.servers[:self.nodes_init])
425        self.operations(self.servers[:self.nodes_init])
426        seqno_expected = 1
427        seqno_comparator = '>='
428        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
429            self.check_seqno(seqno_expected)
430            seqno_comparator = '=='
431        if self.ddocs_num:
432            self.create_ddocs_and_views()
433        self.initial_version = self.upgrade_versions[0]
434        self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
435                   format(self.initial_version))
436        self.product = 'couchbase-server'
437        if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
438            self._install(self.servers[self.nodes_init:self.num_servers], community_to_enterprise=True)
439        else:
440            self._install(self.servers[self.nodes_init:self.num_servers])
441        self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance")
442        if self.during_ops:
443            for opn in self.during_ops:
444                getattr(self, opn)()
445        if self.wait_expire:
446            self.sleep(self.expire_time)
447            for bucket in self.buckets:
448                bucket.kvs[1] = KVStore()
449        self.online_upgrade()
450        self.add_built_in_server_user()
451        self.sleep(10)
452
453        if self.input.param('initial_version', '')[:5] in COUCHBASE_FROM_VERSION_3:
454            self.master = self.servers[self.nodes_init: self.num_servers][0]
455        """ verify DCP upgrade in 3.0.0 version """
456        self.monitor_dcp_rebalance()
457
458        if self.input.param('reboot_cluster', False):
459            self.warm_up_node(self.servers[self.nodes_init: self.num_servers])
460        self._create_ephemeral_buckets()
461        self.verification(self.servers[self.nodes_init: self.num_servers])
462        if self.input.param('check_seqno', True):
463            self.check_seqno(seqno_expected, comparator=seqno_comparator)
464
465    def online_upgrade_incremental(self):
466        self._install(self.servers)
467        self.operations(self.servers)
468        if self.ddocs_num:
469            self.create_ddocs_and_views()
470        for server in self.servers[1:]:
471            self.cluster.rebalance(self.servers, [], [server])
472            self.initial_version = self.upgrade_versions[0]
473            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
474                       format(self.initial_version))
475            self.product = 'couchbase-server'
476            self._install([server])
477            self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance")
478            self.cluster.rebalance(self.servers, [server], [])
479            self.log.info("Rebalanced in upgraded nodes")
480            self.sleep(self.sleep_time)
481            self.verification(self.servers)
482        self._new_master(self.servers[1])
483        self.cluster.rebalance(self.servers, [], [self.servers[0]])
484        self.log.info("Rebalanced out all old version nodes")
485        self.sleep(10)
486
487        """ verify DCP upgrade in 3.0.0 version """
488        self.monitor_dcp_rebalance()
489        self._create_ephemeral_buckets()
490        self.verification(self.servers[1:])
491
492    def online_consequentially_upgrade(self):
493        half_node = len(self.servers) / 2
494        self._install(self.servers[:half_node])
495        self.operations(self.servers[:half_node])
496        if self.ddocs_num:
497            self.create_ddocs_and_views()
498        self.initial_version = self.upgrade_versions[0]
499        self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
500                   format(self.initial_version))
501        self.product = 'couchbase-server'
502        self._install(self.servers[half_node:])
503        self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance")
504        self.log.info("Rebalanced in upgraded nodes and rebalanced out nodes with old version")
505        self.cluster.rebalance(self.servers, self.servers[half_node:], self.servers[:half_node])
506        self.sleep(10)
507
508        """ verify DCP upgrade in 3.x.x version """
509        self.master = self.servers[half_node]
510        self.add_built_in_server_user()
511        self.monitor_dcp_rebalance()
512        self.sleep(self.sleep_time)
513        try:
514            for server in self.servers[half_node:]:
515                if self.port and self.port != '8091':
516                    server.port = self.port
517            self._new_master(self.servers[half_node])
518            self.add_built_in_server_user()
519            self.verification(self.servers[half_node:])
520            self.log.info("Upgrade nodes of old version")
521            upgrade_threads = self._async_update(self.upgrade_versions[0], self.servers[:half_node],
522                                                 None, True)
523            for upgrade_thread in upgrade_threads:
524                upgrade_thread.join()
525            success_upgrade = True
526            while not self.queue.empty():
527                success_upgrade &= self.queue.get()
528            if not success_upgrade:
529                self.fail("Upgrade failed!")
530            self.cluster.rebalance(self.servers[half_node:], self.servers[:half_node], [])
531            self.log.info("Rebalanced in all new version nodes")
532            self.add_built_in_server_user()
533            self.sleep(self.sleep_time)
534            self._create_ephemeral_buckets()
535            self.verification(self.servers)
536        finally:
537            for server in self.servers:
538                server.port = '8091'
539
540    def online_upgrade_and_rebalance(self):
541        self._install(self.servers)
542        self.operations(self.servers)
543        if self.ddocs_num:
544            self.create_ddocs_and_views()
545        upgrade_servers = self.servers[self.nodes_init:self.num_servers]
546        self.cluster.rebalance(self.servers, [], upgrade_servers)
547        self.initial_version = self.upgrade_versions[0]
548        self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
549                   format(self.initial_version))
550        if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
551            self._install(upgrade_servers, community_to_enterprise=True)
552        else:
553            self._install(upgrade_servers)
554        self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance")
555        self.log.info("Rebalance in new version nodes and rebalance out some nodes")
556        self.cluster.rebalance(self.servers, upgrade_servers, self.servers[self.num_servers:])
557        self.log.info("Rebalance completed")
558        self.log.info("Remove the second old version node")
559        self._new_master(self.servers[1])
560        self.cluster.rebalance(self.servers, [], [self.servers[0]])
561        self.log.info("Rebalance completed")
562        self.sleep(10)
563        """ verify DCP upgrade in 3.0.0 version """
564        self.master = self.servers[self.nodes_init]
565        self.monitor_dcp_rebalance()
566        self._create_ephemeral_buckets()
567        self.verification(self.servers[self.nodes_init: -self.nodes_init])
568
569    def online_upgrade(self, services=None):
570        servers_in = self.servers[self.nodes_init:self.num_servers]
571        self.cluster.rebalance(self.servers[:self.nodes_init], servers_in, [], services=services)
572        self.log.info("Rebalance in all {0} nodes" \
573                      .format(self.input.param("upgrade_version", "")))
574        self.sleep(self.sleep_time)
575        status, content = ClusterOperationHelper.find_orchestrator(self.master)
576        self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}". \
577                        format(status, content))
578        FIND_MASTER = False
579        for new_server in servers_in:
580            if content.find(new_server.ip) >= 0:
581                self._new_master(new_server)
582                FIND_MASTER = True
583                self.log.info("%s node %s becomes the master" \
584                              % (self.input.param("upgrade_version", ""), new_server.ip))
585                break
586        if self.input.param("initial_version", "")[:5] in COUCHBASE_VERSION_2 \
587                and not FIND_MASTER and not self.is_downgrade:
588            raise Exception( \
589                "After rebalance in {0} nodes, {0} node doesn't become master" \
590                    .format(self.input.param("upgrade_version", "")))
591
592        servers_out = self.servers[:self.nodes_init]
593        self.log.info("Rebalanced out all old version nodes")
594        self.cluster.rebalance(self.servers[:self.num_servers], [], servers_out)
595
596    def online_upgrade_swap_rebalance(self):
597        self._install(self.servers[:self.nodes_init])
598        self.operations(self.servers[:self.nodes_init])
599        self.initial_version = self.upgrade_versions[0]
600        self.product = 'couchbase-server'
601        self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
602                   format(self.initial_version))
603        if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
604            self._install(self.servers[self.nodes_init:self.num_servers], community_to_enterprise=True)
605        else:
606            self._install(self.servers[self.nodes_init:self.num_servers])
607        self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance")
608        self.swap_num_servers = self.input.param('swap_num_servers', 1)
609        old_servers = self.servers[:self.nodes_init]
610        new_vb_nums = RestHelper(RestConnection(self.master))._get_vbuckets(old_servers,
611                                                                            bucket_name=self.buckets[0].name)
612        new_servers = []
613        for i in range(self.nodes_init / self.swap_num_servers):
614            old_vb_nums = copy.deepcopy(new_vb_nums)
615            servers_in = self.servers[(self.nodes_init + i * self.swap_num_servers):
616            (self.nodes_init + (i + 1) * self.swap_num_servers)]
617            servers_out = self.servers[(i * self.swap_num_servers):((i + 1) * self.swap_num_servers)]
618            servers = old_servers + new_servers
619            self.log.info("Swap rebalance: rebalance out %s old version nodes, rebalance in %s 2.0 Nodes"
620                          % (self.swap_num_servers, self.swap_num_servers))
621            self.cluster.rebalance(servers, servers_in, servers_out)
622            self.sleep(self.sleep_time)
623            old_servers = self.servers[((i + 1) * self.swap_num_servers):self.nodes_init]
624            new_servers = new_servers + servers_in
625            servers = old_servers + new_servers
626            new_vb_nums = RestHelper(RestConnection(self.master))._get_vbuckets(servers,
627                                                                                bucket_name=self.buckets[0].name)
628            self._verify_vbucket_nums_for_swap(old_vb_nums, new_vb_nums)
629            status, content = ClusterOperationHelper.find_orchestrator(servers[0])
630            self.assertTrue(status, msg="Unable to find orchestrator: {0}:{1}". \
631                            format(status, content))
632            FIND_MASTER = False
633            for new_server in new_servers:
634                if content.find(new_server.ip) >= 0:
635                    self._new_master(new_server)
636                    FIND_MASTER = True
637                    self.log.info("3.0 Node %s becomes the master" % (new_server.ip))
638            if not FIND_MASTER and not self.is_downgrade:
639                if self.input.param("initial_version", "")[:5] in COUCHBASE_VERSION_3 \
640                        and self.input.param("upgrade_version", "")[:5] in SHERLOCK_VERSION:
641                    raise Exception("After rebalance in {0} nodes, {0} node doesn't become" \
642                                    " the master ".format(self.input.param("upgrade_version", "")[:5]))
643                elif self.input.param("initial_version", "")[:5] in COUCHBASE_VERSION_2 \
644                        and self.input.param("upgrade_version", "")[:5] in COUCHBASE_VERSION_3:
645                    raise Exception("After rebalance in {0} nodes, {0} node doesn't become" \
646                                    " the master ".format(self.input.param("upgrade_version", "")[:5]))
647
648        """ verify DCP upgrade in 3.0.0 version """
649        self.monitor_dcp_rebalance()
650        self.add_built_in_server_user()
651        self._create_ephemeral_buckets()
652        self.verification(new_servers)
653
654    def online_upgrade_add_services(self):
655        half_node = len(self.servers) / 2
656        self._install(self.servers[:2])
657        self.operations(self.servers[:2])
658        if self.ddocs_num:
659            self.create_ddocs_and_views()
660        self.initial_version = self.upgrade_versions[0]
661        self.sleep(self.sleep_time, "Pre-setup of old version is done. "
662                                    " Wait for online upgrade to {0} version"
663                   .format(self.initial_version))
664        self.product = 'couchbase-server'
665        self._install(self.servers[2:])
666        self.sleep(self.sleep_time, "Installation of new version is done."
667                                    " Wait for rebalance")
668        self.log.info("Rebalanced in upgraded nodes and rebalanced out "
669                      "nodes with old version")
670        self.cluster.rebalance(self.servers, self.servers[2:],
671                               self.servers[:2])
672        self.sleep(10)
673
674        """ verify DCP upgrade in 3.x.x version """
675        self.master = self.servers[2]
676        self.monitor_dcp_rebalance()
677        self.sleep(self.sleep_time)
678        try:
679            for server in self.servers[2:]:
680                if self.port and self.port != '8091':
681                    server.port = self.port
682            self._new_master(self.servers[2])
683            self.verification(self.servers[2:])
684            self.log.info("Upgrade nodes of old version")
685            upgrade_threads = self._async_update(self.upgrade_versions[0],
686                                                 self.servers[:2], None, True)
687            for upgrade_thread in upgrade_threads:
688                upgrade_thread.join()
689            success_upgrade = True
690            while not self.queue.empty():
691                success_upgrade &= self.queue.get()
692            if not success_upgrade:
693                self.fail("Upgrade failed!")
694            self.cluster.rebalance(self.servers[2:],
695                                   self.servers[:2], [])
696            self.log.info("Rebalanced in all new version nodes")
697            self.sleep(self.sleep_time)
698            self._create_ephemeral_buckets()
699            self.verification(self.servers)
700        finally:
701            for server in self.servers:
702                server.port = '8091'
703        """ init_nodes=False so we could set service on it"""
704        self.init_nodes = False
705        self._install(self.servers[-1])
706        self.cluster.rebalance(self.servers[:4],
707                               self.servers[-1], [])
708        # SecondaryIndexingScanTests().test_multi_create_query_explain_drop_index()
709        # we could add more tests later
710
711    def test_swap_rebalance_with_services(self):
712        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
713        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
714        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
715        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
716
717        self.swap_num_servers = self.input.param('swap_num_servers', 4)
718        # Install initial version on the specified nodes
719        self._install(self.servers[:self.nodes_init])
720        # Configure the nodes with services
721        self.operations(self.servers[:self.nodes_init], services="kv,kv,index,n1ql")
722        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
723        self.n1ql_server = self.get_nodes_from_services_map(
724            service_type="n1ql")
725        # Run the pre upgrade operations, typically creating index
726        self.pre_upgrade(self.servers[:self.nodes_init])
727        if self.ddocs_num:
728            self.create_ddocs_and_views()
729        # set the upgrade version
730        self.initial_version = self.upgrade_versions[0]
731        self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
732                   format(self.initial_version))
733        self.product = 'couchbase-server'
734        # install the new version on different set of servers
735        if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
736            self._install(self.servers[self.nodes_init:self.num_servers], community_to_enterprise=True)
737        else:
738            self._install(self.servers[self.nodes_init:self.nodes_init + self.swap_num_servers])
739        self.sleep(self.sleep_time, "Installation of new version is done. Wait for rebalance")
740        if self.during_ops:
741            for opn in self.during_ops:
742                getattr(self, opn)()
743        if self.wait_expire:
744            self.sleep(self.expire_time)
745            for bucket in self.buckets:
746                bucket.kvs[1] = KVStore()
747        # swap and rebalance the servers
748        self.online_upgrade(services=["kv", "kv", "index", "n1ql"])
749        self.n1ql_server = self.get_nodes_from_services_map(
750            service_type="n1ql", servers=self.servers[self.nodes_init:self.num_servers])
751        # Run the post_upgrade operations
752        self._create_ephemeral_buckets()
753        self.post_upgrade(self.servers[self.nodes_init:self.num_servers])
754
755        # Add new services after the upgrade
756        if after_upgrade_services_in is not False:
757            for upgrade_version in self.upgrade_versions:
758                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
759                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]],
760                                                         save_upgrade_config=True)
761                else:
762                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]])
763
764                for upgrade_thread in upgrade_threads:
765                    upgrade_thread.join()
766                success_upgrade = True
767                while not self.queue.empty():
768                    success_upgrade &= self.queue.get()
769                if not success_upgrade:
770                    self.fail("Upgrade failed. See logs above!")
771                self.sleep(self.expire_time)
772                self.cluster.rebalance(self.servers[:self.nodes_init], [self.servers[self.nodes_init]], [],
773                                       services=["kv", "index", "n1ql"])
774        # creating new buckets after upgrade
775        if after_upgrade_buckets_in is not False:
776            self.bucket_size = 100
777            self._create_sasl_buckets(self.master, 1)
778            self._create_standard_buckets(self.master, 1)
779            if self.ddocs_num:
780                self.create_ddocs_and_views()
781                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
782                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
783        # deleting buckets after upgrade
784        if after_upgrade_buckets_out is not False:
785            self._all_buckets_delete(self.master)
786        # flushing buckets after upgrade
787        if after_upgrade_buckets_flush is not False:
788            self._all_buckets_flush()
789
790    def test_regular_rebalance_with_services(self):
791        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
792        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
793        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
794        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
795
796        self.swap_num_servers = self.input.param('swap_num_servers', 4)
797        # Install initial version on the specified nodes
798        self._install(self.servers[:self.nodes_init])
799        # Configure the nodes with services
800        self.operations(self.servers[:self.nodes_init],
801                        services="kv,kv,index,n1ql,kv,kv,index,n1ql")
802        # get the n1ql node which will be used in pre,during and post
803        # upgrade for running n1ql commands
804        self.n1ql_server = self.get_nodes_from_services_map(
805            service_type="n1ql")
806        # Run the pre upgrade operations, typically creating index
807        self.pre_upgrade(self.servers[:self.nodes_init])
808        if self.ddocs_num:
809            self.create_ddocs_and_views()
810        # set the upgrade version
811        self.initial_version = self.upgrade_versions[0]
812        self.sleep(self.sleep_time, "Pre-setup of old version is done. "\
813                                    "Wait for online upgrade to {0} version". \
814                                    format(self.initial_version))
815        upgrade_servers = self.servers[self.swap_num_servers:self.nodes_init]
816        upgrade_services = ["kv", "kv", "index", "n1ql"]
817        # swap out half of the servers out and rebalance
818        self.cluster.rebalance(self.servers, [], upgrade_servers)
819        # install the new version on the swapped out servers
820        self._install(self.servers[self.swap_num_servers:self.nodes_init])
821        self.n1ql_server = self.get_nodes_from_services_map(
822            service_type="n1ql")
823        # Run during upgrade operations
824        self.during_upgrade(self.servers[:self.nodes_init])
825        # rebalance in the swapped out servers
826        for i in range(0, len(upgrade_servers)):
827            self.cluster.rebalance(self.servers, [upgrade_servers[i]], [],
828                                   services=[upgrade_services[i]])
829
830        self._new_master(self.servers[self.swap_num_servers])
831        # swap out the remaining half of the servers and rebalance
832        upgrade_servers = self.servers[:self.swap_num_servers]
833        self.cluster.rebalance(self.servers, [], upgrade_servers)
834        # install the new version on the swapped out servers
835        self._install(self.servers[:self.swap_num_servers])
836        for i in range(0, len(upgrade_servers)):
837            self.cluster.rebalance(self.servers[self.swap_num_servers:self.nodes_init],
838                                   [upgrade_servers[i]], [],
839                                   services=[upgrade_services[i]])
840        self.sleep(timeout=60)
841        self.n1ql_server = self.get_nodes_from_services_map(
842            service_type="n1ql")
843        # Run the post_upgrade operations
844        self._create_ephemeral_buckets()
845        self.post_upgrade(self.servers[:self.nodes_init])
846
847        # Add new services after the upgrade
848        if after_upgrade_services_in is not False:
849            for upgrade_version in self.upgrade_versions:
850                if self.initial_build_type == "community" and \
851                   self.upgrade_build_type == "enterprise":
852                    upgrade_threads = self._async_update(upgrade_version,
853                                                         [self.servers[self.nodes_init]],
854                                                         save_upgrade_config=True)
855                else:
856                    upgrade_threads = self._async_update(upgrade_version,
857                                                         [self.servers[self.nodes_init]])
858
859                for upgrade_thread in upgrade_threads:
860                    upgrade_thread.join()
861                success_upgrade = True
862                while not self.queue.empty():
863                    success_upgrade &= self.queue.get()
864                if not success_upgrade:
865                    self.fail("Upgrade failed. See logs above!")
866                self.sleep(self.expire_time)
867                self.cluster.rebalance(self.servers[:self.nodes_init],
868                                       [self.servers[self.nodes_init]], [],
869                                       services=["kv", "index", "n1ql"])
870        # creating new buckets after upgrade
871        if after_upgrade_buckets_in is not False:
872            self.bucket_size = 100
873            self._create_sasl_buckets(self.master, 1)
874            self._create_standard_buckets(self.master, 1)
875            if self.ddocs_num:
876                self.create_ddocs_and_views()
877                gen_load = BlobGenerator('upgrade', 'upgrade-',
878                                         self.value_size, end=self.num_items)
879                self._load_all_buckets(self.master, gen_load, "create",
880                                       self.expire_time, flag=self.item_flag)
881        # deleting buckets after upgrade
882        if after_upgrade_buckets_out is not False:
883            self._all_buckets_delete(self.master)
884        # flushing buckets after upgrade
885        if after_upgrade_buckets_flush is not False:
886            self._all_buckets_flush()
887
888    def test_graceful_failover_with_services(self):
889        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
890        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
891        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
892        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
893
894        self.swap_num_servers = self.input.param('swap_num_servers', 4)
895        # Install initial version on the specified nodes
896        self._install(self.servers[:self.nodes_init])
897        # Configure the nodes with services
898        self.operations(self.servers[:self.nodes_init], services="kv,index,n1ql,kv,kv,index,n1ql,kv")
899        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
900        self.n1ql_server = self.get_nodes_from_services_map(
901            service_type="n1ql")
902        # Run the pre upgrade operations, typically creating index
903        self.pre_upgrade(self.servers[:self.nodes_init])
904        if self.ddocs_num:
905            self.create_ddocs_and_views()
906        # set the upgrade version
907        self.initial_version = self.upgrade_versions[0]
908        self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for online upgrade to {0} version". \
909                   format(self.initial_version))
910
911        upgrade_servers = self.servers[self.swap_num_servers:self.nodes_init]
912        # do a graceful failover of some nodes
913        self.rest.fail_over('ns_1@' + upgrade_servers[0].ip, graceful=True)
914        self.sleep(timeout=60)
915        self.rest.fail_over('ns_1@' + upgrade_servers[1].ip)
916        self.sleep(timeout=60)
917        self.rest.fail_over('ns_1@' + upgrade_servers[2].ip)
918        self.sleep(timeout=60)
919
920        for i in range(0, len(upgrade_servers) - 1):
921            # set recovery type of the failed over nodes
922            self.rest.set_recovery_type('ns_1@' + upgrade_servers[i].ip, "full")
923
924        # upgrade the failed over nodes with the new version
925        upgrade_threads = self._async_update(self.upgrade_versions[0], upgrade_servers[:len(upgrade_servers) - 1])
926        # wait upgrade statuses
927        for upgrade_thread in upgrade_threads:
928            upgrade_thread.join()
929        success_upgrade = True
930        while not self.queue.empty():
931            success_upgrade &= self.queue.get()
932        if not success_upgrade:
933            self.fail("Upgrade failed. See logs above!")
934        self.sleep(self.expire_time)
935
936        # rebalance in the failed over nodes
937        self.cluster.rebalance(self.servers[:self.nodes_init], [], [])
938
939        # failover another node, this is done so that the conditions of graceful failover are met, otherwise
940        # hard failover will be implemented
941        self.rest.fail_over('ns_1@' + upgrade_servers[3].ip, graceful=True)
942        self.sleep(timeout=60)
943        # set recovery type of the failed over node
944        self.rest.set_recovery_type('ns_1@' + upgrade_servers[3].ip, "full")
945        self.sleep(timeout=60)
946
947        # upgrade the failed over node with the new version
948        upgrade_threads = self._async_update(self.upgrade_versions[0], [upgrade_servers[len(upgrade_servers) - 1]])
949        # wait upgrade statuses
950        for upgrade_thread in upgrade_threads:
951            upgrade_thread.join()
952        success_upgrade = True
953        while not self.queue.empty():
954            success_upgrade &= self.queue.get()
955        if not success_upgrade:
956            self.fail("Upgrade failed. See logs above!")
957        self.sleep(self.expire_time)
958
959        # rebalance in the failed over nodes
960        self.cluster.rebalance(self.servers[:self.nodes_init], [], [])
961        self._new_master(self.servers[self.swap_num_servers])
962        upgrade_servers = self.servers[:self.swap_num_servers]
963        # do a graceful failover of remaining nodes except 1
964        self.rest.fail_over('ns_1@' + upgrade_servers[0].ip, graceful=True)
965        self.sleep(timeout=60)
966        self.rest.fail_over('ns_1@' + upgrade_servers[1].ip)
967        self.sleep(timeout=60)
968        self.rest.fail_over('ns_1@' + upgrade_servers[2].ip)
969        self.sleep(timeout=60)
970
971        for i in range(0, len(upgrade_servers) - 1):
972            # set recovery type of the failed over nodes
973            self.rest.set_recovery_type('ns_1@' + upgrade_servers[i].ip, "full")
974
975        # upgrade the failed over nodes with the new version
976        upgrade_threads = self._async_update(self.upgrade_versions[0], upgrade_servers[:len(upgrade_servers) - 1])
977        # wait upgrade statuses
978        for upgrade_thread in upgrade_threads:
979            upgrade_thread.join()
980        success_upgrade = True
981        while not self.queue.empty():
982            success_upgrade &= self.queue.get()
983        if not success_upgrade:
984            self.fail("Upgrade failed. See logs above!")
985        self.sleep(self.expire_time)
986
987        # rebalance in the failed over nodes
988        self.cluster.rebalance(self.servers[:self.nodes_init], [], [])
989        self.sleep(timeout=60)
990
991        # failover another node, this is done so that the conditions of graceful failover are met, otherwise
992        # hard failover will be implemented
993        self.rest.fail_over('ns_1@' + upgrade_servers[3].ip, graceful=True)
994        self.sleep(timeout=60)
995        # set recovery type of the failed over node
996        self.rest.set_recovery_type('ns_1@' + upgrade_servers[3].ip, "full")
997        self.sleep(timeout=60)
998
999        # upgrade the failed over node with the new version
1000        upgrade_threads = self._async_update(self.upgrade_versions[0], [upgrade_servers[len(upgrade_servers) - 1]])
1001        # wait upgrade statuses
1002        for upgrade_thread in upgrade_threads:
1003            upgrade_thread.join()
1004        success_upgrade = True
1005        while not self.queue.empty():
1006            success_upgrade &= self.queue.get()
1007        if not success_upgrade:
1008            self.fail("Upgrade failed. See logs above!")
1009        self.sleep(self.expire_time)
1010        # rebalance in the failed over node
1011        self.cluster.rebalance(self.servers[:self.nodes_init], [], [])
1012        self.n1ql_server = self.get_nodes_from_services_map(
1013            service_type="n1ql")
1014        # Run the post_upgrade operations
1015        self._create_ephemeral_buckets()
1016        self.post_upgrade(self.servers[:self.nodes_init])
1017        # Add new services after the upgrade
1018        if after_upgrade_services_in is not False:
1019            for upgrade_version in self.upgrade_versions:
1020                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1021                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]],
1022                                                         save_upgrade_config=True)
1023                else:
1024                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]])
1025
1026                for upgrade_thread in upgrade_threads:
1027                    upgrade_thread.join()
1028                success_upgrade = True
1029                while not self.queue.empty():
1030                    success_upgrade &= self.queue.get()
1031                if not success_upgrade:
1032                    self.fail("Upgrade failed. See logs above!")
1033                self.sleep(self.expire_time)
1034                self.cluster.rebalance(self.servers[:self.nodes_init], [self.servers[self.nodes_init]], [],
1035                                       services=["kv", "index", "n1ql"])
1036        # creating new buckets after upgrade
1037        if after_upgrade_buckets_in is not False:
1038            self.bucket_size = 100
1039            self._create_sasl_buckets(self.master, 1)
1040            self._create_standard_buckets(self.master, 1)
1041            if self.ddocs_num:
1042                self.create_ddocs_and_views()
1043                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1044                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1045        # deleting buckets after upgrade
1046        if after_upgrade_buckets_out is not False:
1047            self._all_buckets_delete(self.master)
1048        # flushing buckets after upgrade
1049        if after_upgrade_buckets_flush is not False:
1050            self._all_buckets_flush()
1051
1052    def test_xdcr_upgrade_with_services(self):
1053        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1054        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1055        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1056        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1057
1058        # Install initial version on the specified nodes
1059        self._install(self.servers[:self.nodes_init])
1060        # Configure the nodes with services on cluster1
1061        self.operations(self.servers[:self.nodes_init], services="kv,kv,index,n1ql")
1062        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1063        self.n1ql_server = self.get_nodes_from_services_map(
1064            service_type="n1ql")
1065        # Run the pre upgrade operations, typically creating index
1066        self.pre_upgrade(self.servers[:self.nodes_init])
1067        if self.ddocs_num:
1068            self.create_ddocs_and_views()
1069        # set the upgrade version
1070        self.initial_version = self.upgrade_versions[0]
1071        # install new version on another set of nodes
1072        self._install(self.servers[self.nodes_init:self.num_servers])
1073        self.master = self.servers[self.nodes_init]
1074        # Configure the nodes with services on the other cluster2
1075        try:
1076            self.operations(self.servers[self.nodes_init:self.num_servers], services="kv,kv,index,n1ql")
1077            self.sleep(timeout=300)
1078        except Exception, ex:
1079            # do nothing, the bucket is created
1080            self.log.info("bucket is created")
1081
1082        # create a xdcr relationship between cluster1 and cluster2
1083        rest_src = RestConnection(self.servers[0])
1084        rest_src.add_remote_cluster(self.servers[self.nodes_init].ip, self.servers[self.nodes_init].port,
1085                                    'Administrator', 'password', "C2")
1086        repl_id = rest_src.start_replication('continuous', 'default', "C2")
1087        if repl_id is not None:
1088            self.log.info("Replication created successfully")
1089        # Run the post_upgrade operations
1090        self._create_ephemeral_buckets()
1091        self.post_upgrade(self.servers[:self.nodes_init])
1092        # Add new services after the upgrade
1093        if after_upgrade_services_in is not False:
1094            for upgrade_version in self.upgrade_versions:
1095                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1096                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]],
1097                                                         save_upgrade_config=True)
1098                else:
1099                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]])
1100
1101                for upgrade_thread in upgrade_threads:
1102                    upgrade_thread.join()
1103                success_upgrade = True
1104                while not self.queue.empty():
1105                    success_upgrade &= self.queue.get()
1106                if not success_upgrade:
1107                    self.fail("Upgrade failed. See logs above!")
1108                self.sleep(120)
1109                self.cluster.rebalance(self.servers[:self.nodes_init], [self.servers[self.nodes_init]], [],
1110                                       services=["kv", "index", "n1ql"])
1111        # creating new buckets after upgrade
1112        if after_upgrade_buckets_in is not False:
1113            self.bucket_size = 100
1114            self._create_sasl_buckets(self.master, 1)
1115            self._create_standard_buckets(self.master, 1)
1116            if self.ddocs_num:
1117                self.create_ddocs_and_views()
1118                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1119                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1120        # deleting buckets after upgrade
1121        if after_upgrade_buckets_out is not False:
1122            self._all_buckets_delete(self.master)
1123        # flushing buckets after upgrade
1124        if after_upgrade_buckets_flush is not False:
1125            self._all_buckets_flush()
1126
1127    def test_offline_upgrade_with_services(self):
1128        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1129        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1130        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1131        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1132
1133        # Install initial version on the specified nodes
1134        self._install(self.servers[:self.nodes_init])
1135        # Configure the nodes with services on cluster
1136        self.operations(self.servers[:self.nodes_init], services="kv,index,n1ql,kv,kv,index,n1ql,kv")
1137        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1138        self.n1ql_server = self.get_nodes_from_services_map(
1139            service_type="n1ql")
1140        # Run the pre upgrade operations, typically creating index
1141        self.pre_upgrade(self.servers[:self.nodes_init])
1142        seqno_expected = 1
1143        if self.ddocs_num:
1144            self.create_ddocs_and_views()
1145            if self.input.param('run_index', False):
1146                self.verify_all_queries()
1147        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1148            self.check_seqno(seqno_expected)
1149        if self.during_ops:
1150            for opn in self.during_ops:
1151                if opn != 'add_back_failover':
1152                    getattr(self, opn)()
1153        num_stoped_nodes = self.input.param('num_stoped_nodes', self.nodes_init)
1154        # upgrade all the nodes in the cluster offline
1155        upgrade_nodes = self.servers[self.nodes_init - num_stoped_nodes:self.nodes_init]
1156        for upgrade_version in self.upgrade_versions:
1157            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1158                       format(upgrade_version))
1159            for server in upgrade_nodes:
1160                remote = RemoteMachineShellConnection(server)
1161                remote.stop_server()
1162                self.sleep(self.sleep_time)
1163                if self.wait_expire:
1164                    self.sleep(self.expire_time)
1165                if self.input.param('remove_manifest_files', False):
1166                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1167                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1168                        remote.log_command_output(output, error)
1169                if self.input.param('remove_config_files', False):
1170                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1171                        output, error = remote.execute_command(
1172                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1173                        remote.log_command_output(output, error)
1174                    self.buckets = []
1175                remote.disconnect()
1176            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1177                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1178            else:
1179                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1180            # wait upgrade statuses
1181            for upgrade_thread in upgrade_threads:
1182                upgrade_thread.join()
1183            success_upgrade = True
1184            while not self.queue.empty():
1185                success_upgrade &= self.queue.get()
1186            if not success_upgrade:
1187                self.fail("Upgrade failed. See logs above!")
1188            self.sleep(self.expire_time)
1189        self.dcp_rebalance_in_offline_upgrade_from_version2()
1190        self.n1ql_server = self.get_nodes_from_services_map(
1191            service_type="n1ql")
1192        # Run the post_upgrade operations
1193        self._create_ephemeral_buckets()
1194        self.post_upgrade(self.servers[:self.nodes_init])
1195        self.check_seqno(seqno_expected)
1196        # Add new services after the upgrade
1197        if after_upgrade_services_in is not False:
1198            for upgrade_version in self.upgrade_versions:
1199                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1200                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]],
1201                                                         save_upgrade_config=True)
1202                else:
1203                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]])
1204
1205                for upgrade_thread in upgrade_threads:
1206                    upgrade_thread.join()
1207                success_upgrade = True
1208                while not self.queue.empty():
1209                    success_upgrade &= self.queue.get()
1210                if not success_upgrade:
1211                    self.fail("Upgrade failed. See logs above!")
1212                self.sleep(self.expire_time)
1213                self.cluster.rebalance(self.servers[:self.nodes_init], [self.servers[self.nodes_init]], [],
1214                                       services=["kv", "index", "n1ql"])
1215        # creating new buckets after upgrade
1216        if after_upgrade_buckets_in is not False:
1217            self.bucket_size = 100
1218            self._create_sasl_buckets(self.master, 1)
1219            self._create_standard_buckets(self.master, 1)
1220            if self.ddocs_num:
1221                self.create_ddocs_and_views()
1222                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1223                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1224        # deleting buckets after upgrade
1225        if after_upgrade_buckets_out is not False:
1226            self._all_buckets_delete(self.master)
1227        # flushing buckets after upgrade
1228        if after_upgrade_buckets_flush is not False:
1229            self._all_buckets_flush()
1230
1231    def test_offline_upgrade_n1ql_service_in_new_version(self):
1232        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1233        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1234        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1235        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1236
1237        # Install initial version on the specified nodes
1238        self._install(self.servers[:3])
1239        # Configure the nodes with services on cluster
1240        self.operations(self.servers[:3], services="kv,index,n1ql")
1241        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1242        self.n1ql_server = self.get_nodes_from_services_map(
1243            service_type="n1ql")
1244        # Run the pre upgrade operations, typically creating index
1245        self.pre_upgrade(self.servers[:3])
1246        seqno_expected = 1
1247        if self.ddocs_num:
1248            self.create_ddocs_and_views()
1249            if self.input.param('run_index', False):
1250                self.verify_all_queries()
1251        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1252            self.check_seqno(seqno_expected)
1253        if self.during_ops:
1254            for opn in self.during_ops:
1255                if opn != 'add_back_failover':
1256                    getattr(self, opn)()
1257        upgrade_nodes = [self.servers[2]]
1258        # upgrade n1ql node to new version
1259        for upgrade_version in self.upgrade_versions:
1260            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1261                       format(upgrade_version))
1262            for server in upgrade_nodes:
1263                remote = RemoteMachineShellConnection(server)
1264                remote.stop_server()
1265                self.sleep(self.sleep_time)
1266                if self.wait_expire:
1267                    self.sleep(self.expire_time)
1268                if self.input.param('remove_manifest_files', False):
1269                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1270                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1271                        remote.log_command_output(output, error)
1272                if self.input.param('remove_config_files', False):
1273                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1274                        output, error = remote.execute_command(
1275                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1276                        remote.log_command_output(output, error)
1277                    self.buckets = []
1278                remote.disconnect()
1279
1280            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1281                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1282            else:
1283                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1284            # wait upgrade statuses
1285            for upgrade_thread in upgrade_threads:
1286                upgrade_thread.join()
1287            success_upgrade = True
1288            while not self.queue.empty():
1289                success_upgrade &= self.queue.get()
1290            if not success_upgrade:
1291                self.fail("Upgrade failed. See logs above!")
1292            self.sleep(self.expire_time)
1293        self.dcp_rebalance_in_offline_upgrade_from_version2()
1294        # Run the post_upgrade operations
1295        self._create_ephemeral_buckets()
1296        self.post_upgrade(self.servers[:3])
1297        # Add new services after the upgrade
1298        if after_upgrade_services_in is not False:
1299            for upgrade_version in self.upgrade_versions:
1300                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1301                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]],
1302                                                         save_upgrade_config=True)
1303                else:
1304                    upgrade_threads = self._async_update(upgrade_version, [self.servers[self.nodes_init]])
1305
1306                for upgrade_thread in upgrade_threads:
1307                    upgrade_thread.join()
1308                success_upgrade = True
1309                while not self.queue.empty():
1310                    success_upgrade &= self.queue.get()
1311                if not success_upgrade:
1312                    self.fail("Upgrade failed. See logs above!")
1313                self.sleep(self.expire_time)
1314                self.cluster.rebalance(self.servers[:self.nodes_init], [self.servers[self.nodes_init]], [],
1315                                       services=["kv", "index", "n1ql"])
1316        # creating new buckets after upgrade
1317        if after_upgrade_buckets_in is not False:
1318            self.bucket_size = 100
1319            self._create_sasl_buckets(self.master, 1)
1320            self._create_standard_buckets(self.master, 1)
1321            if self.ddocs_num:
1322                self.create_ddocs_and_views()
1323                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1324                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1325        # deleting buckets after upgrade
1326        if after_upgrade_buckets_out is not False:
1327            self._all_buckets_delete(self.master)
1328        # flushing buckets after upgrade
1329        if after_upgrade_buckets_flush is not False:
1330            self._all_buckets_flush()
1331
1332    def test_offline_upgrade_index_service_in_new_version(self):
1333        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1334        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1335        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1336        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1337
1338        # Install initial version on the specified nodes
1339        self._install(self.servers[:3])
1340        # Configure the nodes with services on cluster
1341        self.operations(self.servers[:3], services="kv,n1ql,index")
1342        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1343        self.n1ql_server = self.get_nodes_from_services_map(
1344            service_type="n1ql")
1345        # Run the pre upgrade operations, typically creating index
1346        self.pre_upgrade(self.servers[:3])
1347        seqno_expected = 1
1348        if self.ddocs_num:
1349            self.create_ddocs_and_views()
1350            if self.input.param('run_index', False):
1351                self.verify_all_queries()
1352        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1353            self.check_seqno(seqno_expected)
1354        if self.during_ops:
1355            for opn in self.during_ops:
1356                if opn != 'add_back_failover':
1357                    getattr(self, opn)()
1358        # upgrade index node to new version
1359        upgrade_nodes = [self.servers[2]]
1360        for upgrade_version in self.upgrade_versions:
1361            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1362                       format(upgrade_version))
1363            for server in upgrade_nodes:
1364                remote = RemoteMachineShellConnection(server)
1365                remote.stop_server()
1366                self.sleep(self.sleep_time)
1367                if self.wait_expire:
1368                    self.sleep(self.expire_time)
1369                if self.input.param('remove_manifest_files', False):
1370                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1371                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1372                        remote.log_command_output(output, error)
1373                if self.input.param('remove_config_files', False):
1374                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1375                        output, error = remote.execute_command(
1376                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1377                        remote.log_command_output(output, error)
1378                    self.buckets = []
1379                remote.disconnect()
1380
1381            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1382                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1383            else:
1384                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1385            # wait upgrade statuses
1386            for upgrade_thread in upgrade_threads:
1387                upgrade_thread.join()
1388            success_upgrade = True
1389            while not self.queue.empty():
1390                success_upgrade &= self.queue.get()
1391            if not success_upgrade:
1392                self.fail("Upgrade failed. See logs above!")
1393            self.sleep(self.expire_time)
1394        self.dcp_rebalance_in_offline_upgrade_from_version2()
1395        # Run the post_upgrade operations
1396        self._create_ephemeral_buckets()
1397        self.post_upgrade(self.servers[:3])
1398        # Add new services after the upgrade
1399        if after_upgrade_services_in is not False:
1400            for upgrade_version in self.upgrade_versions:
1401                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1402                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]], save_upgrade_config=True)
1403                else:
1404                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]])
1405
1406                for upgrade_thread in upgrade_threads:
1407                    upgrade_thread.join()
1408                success_upgrade = True
1409                while not self.queue.empty():
1410                    success_upgrade &= self.queue.get()
1411                if not success_upgrade:
1412                    self.fail("Upgrade failed. See logs above!")
1413                self.sleep(self.expire_time)
1414                self.cluster.rebalance(self.servers[:3], [self.servers[3]], [],
1415                                       services=["kv", "index", "n1ql"])
1416        # creating new buckets after upgrade
1417        if after_upgrade_buckets_in is not False:
1418            self.bucket_size = 100
1419            self._create_sasl_buckets(self.master, 1)
1420            self._create_standard_buckets(self.master, 1)
1421            if self.ddocs_num:
1422                self.create_ddocs_and_views()
1423                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1424                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1425        # deleting buckets after upgrade
1426        if after_upgrade_buckets_out is not False:
1427            self._all_buckets_delete(self.master)
1428        # flushing buckets after upgrade
1429        if after_upgrade_buckets_flush is not False:
1430            self._all_buckets_flush()
1431
1432    def test_offline_upgrade_kv_service_in_new_version(self):
1433        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1434        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1435        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1436        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1437
1438        # Install initial version on the specified nodes
1439        self._install(self.servers[:3])
1440        # Configure the nodes with services
1441        self.operations(self.servers[:3], services="kv,n1ql,index")
1442        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1443        self.n1ql_server = self.get_nodes_from_services_map(
1444            service_type="n1ql")
1445        # Run the pre upgrade operations, typically creating index
1446        self.pre_upgrade(self.servers[:3])
1447        seqno_expected = 1
1448        if self.ddocs_num:
1449            self.create_ddocs_and_views()
1450            if self.input.param('run_index', False):
1451                self.verify_all_queries()
1452        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1453            self.check_seqno(seqno_expected)
1454        if self.during_ops:
1455            for opn in self.during_ops:
1456                if opn != 'add_back_failover':
1457                    getattr(self, opn)()
1458        # upgrade kv node to new version
1459        upgrade_nodes = [self.servers[0]]
1460        for upgrade_version in self.upgrade_versions:
1461            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1462                       format(upgrade_version))
1463            for server in upgrade_nodes:
1464                remote = RemoteMachineShellConnection(server)
1465                remote.stop_server()
1466                self.sleep(self.sleep_time)
1467                if self.wait_expire:
1468                    self.sleep(self.expire_time)
1469                if self.input.param('remove_manifest_files', False):
1470                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1471                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1472                        remote.log_command_output(output, error)
1473                if self.input.param('remove_config_files', False):
1474                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1475                        output, error = remote.execute_command(
1476                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1477                        remote.log_command_output(output, error)
1478                    self.buckets = []
1479                remote.disconnect()
1480
1481            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1482                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1483            else:
1484                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1485            # wait upgrade statuses
1486            for upgrade_thread in upgrade_threads:
1487                upgrade_thread.join()
1488            success_upgrade = True
1489            while not self.queue.empty():
1490                success_upgrade &= self.queue.get()
1491            if not success_upgrade:
1492                self.fail("Upgrade failed. See logs above!")
1493            self.sleep(self.expire_time)
1494        self.dcp_rebalance_in_offline_upgrade_from_version2()
1495        # Run the post_upgrade operations
1496        self._create_ephemeral_buckets()
1497        self.post_upgrade(self.servers[:3])
1498        # Add new services after the upgrade
1499        if after_upgrade_services_in is not False:
1500            for upgrade_version in self.upgrade_versions:
1501                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1502                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]], save_upgrade_config=True)
1503                else:
1504                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]])
1505
1506                for upgrade_thread in upgrade_threads:
1507                    upgrade_thread.join()
1508                success_upgrade = True
1509                while not self.queue.empty():
1510                    success_upgrade &= self.queue.get()
1511                if not success_upgrade:
1512                    self.fail("Upgrade failed. See logs above!")
1513                self.sleep(self.expire_time)
1514                self.cluster.rebalance(self.servers[:3], [self.servers[3]], [],
1515                                       services=["kv", "index", "n1ql"])
1516        # creating new buckets after upgrade
1517        if after_upgrade_buckets_in is not False:
1518            self.bucket_size = 100
1519            self._create_sasl_buckets(self.master, 1)
1520            self._create_standard_buckets(self.master, 1)
1521            if self.ddocs_num:
1522                self.create_ddocs_and_views()
1523                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1524                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1525        # deleting buckets after upgrade
1526        if after_upgrade_buckets_out is not False:
1527            self._all_buckets_delete(self.master)
1528        # flushing buckets after upgrade
1529        if after_upgrade_buckets_flush is not False:
1530            self._all_buckets_flush()
1531
1532    def test_offline_upgrade_n1ql_index_service_in_new_version(self):
1533        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1534        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1535        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1536        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1537
1538        # Install initial version on the specified nodes
1539        self._install(self.servers[:3])
1540        # Configure the nodes with services
1541        self.operations(self.servers[:3], services="kv,n1ql,index")
1542        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1543        self.n1ql_server = self.get_nodes_from_services_map(
1544            service_type="n1ql")
1545        # Run the pre upgrade operations, typically creating index
1546        self.pre_upgrade(self.servers[:3])
1547        seqno_expected = 1
1548        if self.ddocs_num:
1549            self.create_ddocs_and_views()
1550            if self.input.param('run_index', False):
1551                self.verify_all_queries()
1552        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1553            self.check_seqno(seqno_expected)
1554        if self.during_ops:
1555            for opn in self.during_ops:
1556                if opn != 'add_back_failover':
1557                    getattr(self, opn)()
1558        # upgrade n1ql and index nodes to new version
1559        upgrade_nodes = self.servers[1:3]
1560        for upgrade_version in self.upgrade_versions:
1561            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1562                       format(upgrade_version))
1563            for server in upgrade_nodes:
1564                remote = RemoteMachineShellConnection(server)
1565                remote.stop_server()
1566                self.sleep(self.sleep_time)
1567                if self.wait_expire:
1568                    self.sleep(self.expire_time)
1569                if self.input.param('remove_manifest_files', False):
1570                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1571                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1572                        remote.log_command_output(output, error)
1573                if self.input.param('remove_config_files', False):
1574                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1575                        output, error = remote.execute_command(
1576                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1577                        remote.log_command_output(output, error)
1578                    self.buckets = []
1579                remote.disconnect()
1580
1581            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1582                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1583            else:
1584                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1585            # wait upgrade statuses
1586            for upgrade_thread in upgrade_threads:
1587                upgrade_thread.join()
1588            success_upgrade = True
1589            while not self.queue.empty():
1590                success_upgrade &= self.queue.get()
1591            if not success_upgrade:
1592                self.fail("Upgrade failed. See logs above!")
1593            self.sleep(self.expire_time)
1594        self.dcp_rebalance_in_offline_upgrade_from_version2()
1595        # Run the post_upgrade operations
1596        self._create_ephemeral_buckets()
1597        self.post_upgrade(self.servers[:3])
1598        # Add new services after the upgrade
1599        if after_upgrade_services_in is not False:
1600            for upgrade_version in self.upgrade_versions:
1601                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1602                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]], save_upgrade_config=True)
1603                else:
1604                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]])
1605
1606                for upgrade_thread in upgrade_threads:
1607                    upgrade_thread.join()
1608                success_upgrade = True
1609                while not self.queue.empty():
1610                    success_upgrade &= self.queue.get()
1611                if not success_upgrade:
1612                    self.fail("Upgrade failed. See logs above!")
1613                self.sleep(self.expire_time)
1614                self.cluster.rebalance(self.servers[:3], [self.servers[3]], [],
1615                                       services=["kv", "index", "n1ql"])
1616        # creating new buckets after upgrade
1617        if after_upgrade_buckets_in is not False:
1618            self.bucket_size = 100
1619            self._create_sasl_buckets(self.master, 1)
1620            self._create_standard_buckets(self.master, 1)
1621            if self.ddocs_num:
1622                self.create_ddocs_and_views()
1623                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1624                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1625        # deleting buckets after upgrade
1626        if after_upgrade_buckets_out is not False:
1627            self._all_buckets_delete(self.master)
1628        # flushing buckets after upgrade
1629        if after_upgrade_buckets_flush is not False:
1630            self._all_buckets_flush()
1631
1632    def test_offline_upgrade_kv_index_service_in_new_version(self):
1633        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1634        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1635        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1636        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1637
1638        # Install initial version on the specified nodes
1639        self._install(self.servers[:3])
1640        # Configure the nodes with services
1641        self.operations(self.servers[:3], services="kv,index,n1ql")
1642        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1643        self.n1ql_server = self.get_nodes_from_services_map(
1644            service_type="n1ql")
1645        # Run the pre upgrade operations, typically creating index
1646        self.pre_upgrade(self.servers[:3])
1647        seqno_expected = 1
1648        if self.ddocs_num:
1649            self.create_ddocs_and_views()
1650            if self.input.param('run_index', False):
1651                self.verify_all_queries()
1652        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1653            self.check_seqno(seqno_expected)
1654        if self.during_ops:
1655            for opn in self.during_ops:
1656                if opn != 'add_back_failover':
1657                    getattr(self, opn)()
1658        # upgrade kv and index nodes to new version
1659        upgrade_nodes = self.servers[0:2]
1660        for upgrade_version in self.upgrade_versions:
1661            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1662                       format(upgrade_version))
1663            for server in upgrade_nodes:
1664                remote = RemoteMachineShellConnection(server)
1665                remote.stop_server()
1666                self.sleep(self.sleep_time)
1667                if self.wait_expire:
1668                    self.sleep(self.expire_time)
1669                if self.input.param('remove_manifest_files', False):
1670                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1671                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1672                        remote.log_command_output(output, error)
1673                if self.input.param('remove_config_files', False):
1674                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1675                        output, error = remote.execute_command(
1676                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1677                        remote.log_command_output(output, error)
1678                    self.buckets = []
1679                remote.disconnect()
1680
1681            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1682                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1683            else:
1684                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1685            # wait upgrade statuses
1686            for upgrade_thread in upgrade_threads:
1687                upgrade_thread.join()
1688            success_upgrade = True
1689            while not self.queue.empty():
1690                success_upgrade &= self.queue.get()
1691            if not success_upgrade:
1692                self.fail("Upgrade failed. See logs above!")
1693            self.sleep(self.expire_time)
1694        self.dcp_rebalance_in_offline_upgrade_from_version2()
1695        # Run the post_upgrade operations
1696        self._create_ephemeral_buckets()
1697        self.post_upgrade(self.servers[:3])
1698        # Add new services after the upgrade
1699        if after_upgrade_services_in is not False:
1700            for upgrade_version in self.upgrade_versions:
1701                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1702                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]], save_upgrade_config=True)
1703                else:
1704                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]])
1705
1706                for upgrade_thread in upgrade_threads:
1707                    upgrade_thread.join()
1708                success_upgrade = True
1709                while not self.queue.empty():
1710                    success_upgrade &= self.queue.get()
1711                if not success_upgrade:
1712                    self.fail("Upgrade failed. See logs above!")
1713                self.sleep(self.expire_time)
1714                self.cluster.rebalance(self.servers[:3], [self.servers[3]], [],
1715                                       services=["kv", "index", "n1ql"])
1716        # creating new buckets after upgrade
1717        if after_upgrade_buckets_in is not False:
1718            self.bucket_size = 100
1719            self._create_sasl_buckets(self.master, 1)
1720            self._create_standard_buckets(self.master, 1)
1721            if self.ddocs_num:
1722                self.create_ddocs_and_views()
1723                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1724                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1725        # deleting buckets after upgrade
1726        if after_upgrade_buckets_out is not False:
1727            self._all_buckets_delete(self.master)
1728        # flushing buckets after upgrade
1729        if after_upgrade_buckets_flush is not False:
1730            self._all_buckets_flush()
1731
1732    def test_offline_upgrade_kv_n1ql_service_in_new_version(self):
1733        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1734        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1735        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1736        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1737
1738        # Install initial version on the specified nodes
1739        self._install(self.servers[:3])
1740        # Configure the nodes with services
1741        self.operations(self.servers[:3], services="kv,index,n1ql")
1742        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1743        self.n1ql_server = self.get_nodes_from_services_map(
1744            service_type="n1ql")
1745        # Run the pre upgrade operations, typically creating index
1746        self.pre_upgrade(self.servers[:3])
1747        seqno_expected = 1
1748        if self.ddocs_num:
1749            self.create_ddocs_and_views()
1750            if self.input.param('run_index', False):
1751                self.verify_all_queries()
1752        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1753            self.check_seqno(seqno_expected)
1754        if self.during_ops:
1755            for opn in self.during_ops:
1756                if opn != 'add_back_failover':
1757                    getattr(self, opn)()
1758        # upgrade kv and n1ql nodes to new version
1759        upgrade_nodes = [self.servers[0], self.servers[2]]
1760        for upgrade_version in self.upgrade_versions:
1761            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1762                       format(upgrade_version))
1763            for server in upgrade_nodes:
1764                remote = RemoteMachineShellConnection(server)
1765                remote.stop_server()
1766                self.sleep(self.sleep_time)
1767                if self.wait_expire:
1768                    self.sleep(self.expire_time)
1769                if self.input.param('remove_manifest_files', False):
1770                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1771                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1772                        remote.log_command_output(output, error)
1773                if self.input.param('remove_config_files', False):
1774                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1775                        output, error = remote.execute_command(
1776                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1777                        remote.log_command_output(output, error)
1778                    self.buckets = []
1779                remote.disconnect()
1780
1781            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1782                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1783            else:
1784                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1785            # wait upgrade statuses
1786            for upgrade_thread in upgrade_threads:
1787                upgrade_thread.join()
1788            success_upgrade = True
1789            while not self.queue.empty():
1790                success_upgrade &= self.queue.get()
1791            if not success_upgrade:
1792                self.fail("Upgrade failed. See logs above!")
1793            self.sleep(self.expire_time)
1794        self.dcp_rebalance_in_offline_upgrade_from_version2()
1795        # Run the post_upgrade operations
1796        self._create_ephemeral_buckets()
1797        self.post_upgrade(self.servers[:3])
1798        # Add new services after the upgrade
1799        if after_upgrade_services_in is not False:
1800            for upgrade_version in self.upgrade_versions:
1801                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1802                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]], save_upgrade_config=True)
1803                else:
1804                    upgrade_threads = self._async_update(upgrade_version, [self.servers[3]])
1805
1806                for upgrade_thread in upgrade_threads:
1807                    upgrade_thread.join()
1808                success_upgrade = True
1809                while not self.queue.empty():
1810                    success_upgrade &= self.queue.get()
1811                if not success_upgrade:
1812                    self.fail("Upgrade failed. See logs above!")
1813                self.sleep(self.expire_time)
1814                self.cluster.rebalance(self.servers[:3], [self.servers[3]], [],
1815                                       services=["kv", "index", "n1ql"])
1816        # creating new buckets after upgrade
1817        if after_upgrade_buckets_in is not False:
1818            self.bucket_size = 100
1819            self._create_sasl_buckets(self.master, 1)
1820            self._create_standard_buckets(self.master, 1)
1821            if self.ddocs_num:
1822                self.create_ddocs_and_views()
1823                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1824                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1825        # deleting buckets after upgrade
1826        if after_upgrade_buckets_out is not False:
1827            self._all_buckets_delete(self.master)
1828        # flushing buckets after upgrade
1829        if after_upgrade_buckets_flush is not False:
1830            self._all_buckets_flush()
1831
1832    def test_offline_upgrade_n1ql_index_service_in_new_version_with_multiple_nodes(self):
1833        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1834        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1835        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1836        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1837
1838        self.bucket_size = 1000
1839        # Install initial version on the specified nodes
1840        self._install(self.servers[:6])
1841        # Configure the nodes with services
1842        self.operations(self.servers[:6], services="kv,kv,n1ql,index,n1ql,index")
1843        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1844        self.n1ql_server = self.get_nodes_from_services_map(
1845            service_type="n1ql")
1846        # Run the pre upgrade operations, typically creating index
1847        self.pre_upgrade(self.servers[:6])
1848        seqno_expected = 1
1849        if self.ddocs_num:
1850            self.create_ddocs_and_views()
1851            if self.input.param('run_index', False):
1852                self.verify_all_queries()
1853        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1854            self.check_seqno(seqno_expected)
1855        if self.during_ops:
1856            for opn in self.during_ops:
1857                if opn != 'add_back_failover':
1858                    getattr(self, opn)()
1859        # upgrade index and n1ql nodes to new version when we have multiple nodes
1860        upgrade_nodes = self.servers[4:6]
1861        for upgrade_version in self.upgrade_versions:
1862            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1863                       format(upgrade_version))
1864            for server in upgrade_nodes:
1865                remote = RemoteMachineShellConnection(server)
1866                remote.stop_server()
1867                self.sleep(self.sleep_time)
1868                if self.wait_expire:
1869                    self.sleep(self.expire_time)
1870                if self.input.param('remove_manifest_files', False):
1871                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1872                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1873                        remote.log_command_output(output, error)
1874                if self.input.param('remove_config_files', False):
1875                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1876                        output, error = remote.execute_command(
1877                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1878                        remote.log_command_output(output, error)
1879                    self.buckets = []
1880                remote.disconnect()
1881
1882            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1883                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1884            else:
1885                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1886            # wait upgrade statuses
1887            self.n1ql_server = self.get_nodes_from_services_map(
1888                service_type="n1ql")
1889            self.during_upgrade(self.servers[:6])
1890            for upgrade_thread in upgrade_threads:
1891                upgrade_thread.join()
1892            success_upgrade = True
1893            while not self.queue.empty():
1894                success_upgrade &= self.queue.get()
1895            if not success_upgrade:
1896                self.fail("Upgrade failed. See logs above!")
1897            self.sleep(self.expire_time)
1898        self.dcp_rebalance_in_offline_upgrade_from_version2()
1899        # Run the post_upgrade operations
1900        self._create_ephemeral_buckets()
1901        self.post_upgrade(self.servers[:6])
1902        # Add new services after the upgrade
1903        if after_upgrade_services_in is not False:
1904            for upgrade_version in self.upgrade_versions:
1905                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1906                    upgrade_threads = self._async_update(upgrade_version, [self.servers[6]], save_upgrade_config=True)
1907                else:
1908                    upgrade_threads = self._async_update(upgrade_version, [self.servers[6]])
1909
1910                for upgrade_thread in upgrade_threads:
1911                    upgrade_thread.join()
1912                success_upgrade = True
1913                while not self.queue.empty():
1914                    success_upgrade &= self.queue.get()
1915                if not success_upgrade:
1916                    self.fail("Upgrade failed. See logs above!")
1917                self.sleep(120)
1918                self.cluster.rebalance(self.servers[:6], [self.servers[6]], [],
1919                                       services=["kv", "index", "n1ql"])
1920        # creating new buckets after upgrade
1921        if after_upgrade_buckets_in is not False:
1922            self.bucket_size = 100
1923            self._create_sasl_buckets(self.master, 1)
1924            self._create_standard_buckets(self.master, 1)
1925            if self.ddocs_num:
1926                self.create_ddocs_and_views()
1927                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
1928                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
1929        # deleting buckets after upgrade
1930        if after_upgrade_buckets_out is not False:
1931            self._all_buckets_delete(self.master)
1932        # flushing buckets after upgrade
1933        if after_upgrade_buckets_flush is not False:
1934            self._all_buckets_flush()
1935
1936    def test_offline_upgrade_kv_index_service_in_new_version_with_multiple_nodes(self):
1937        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
1938        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
1939        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
1940        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
1941
1942        self.bucket_size = 1000
1943        # Install initial version on the specified nodes
1944        self._install(self.servers[:6])
1945        # Configure the nodes with services
1946        self.operations(self.servers[:6], services="kv,index,n1ql,n1ql,kv,index")
1947        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
1948        self.n1ql_server = self.get_nodes_from_services_map(
1949            service_type="n1ql")
1950        # Run the pre upgrade operations, typically creating index
1951        self.pre_upgrade(self.servers[:6])
1952        seqno_expected = 1
1953        if self.ddocs_num:
1954            self.create_ddocs_and_views()
1955            if self.input.param('run_index', False):
1956                self.verify_all_queries()
1957        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
1958            self.check_seqno(seqno_expected)
1959        if self.during_ops:
1960            for opn in self.during_ops:
1961                if opn != 'add_back_failover':
1962                    getattr(self, opn)()
1963        # upgrade index and kv nodes to new version when we have multiple nodes
1964        upgrade_nodes = self.servers[4:6]
1965        for upgrade_version in self.upgrade_versions:
1966            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
1967                       format(upgrade_version))
1968            for server in upgrade_nodes:
1969                remote = RemoteMachineShellConnection(server)
1970                remote.stop_server()
1971                self.sleep(self.sleep_time)
1972                if self.wait_expire:
1973                    self.sleep(self.expire_time)
1974                if self.input.param('remove_manifest_files', False):
1975                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
1976                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
1977                        remote.log_command_output(output, error)
1978                if self.input.param('remove_config_files', False):
1979                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
1980                        output, error = remote.execute_command(
1981                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
1982                        remote.log_command_output(output, error)
1983                    self.buckets = []
1984                remote.disconnect()
1985
1986            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
1987                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
1988            else:
1989                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
1990            # wait upgrade statuses
1991            self.n1ql_server = self.get_nodes_from_services_map(
1992                service_type="n1ql")
1993            self.during_upgrade(self.servers[:6])
1994            for upgrade_thread in upgrade_threads:
1995                upgrade_thread.join()
1996            success_upgrade = True
1997            while not self.queue.empty():
1998                success_upgrade &= self.queue.get()
1999            if not success_upgrade:
2000                self.fail("Upgrade failed. See logs above!")
2001            self.sleep(self.expire_time)
2002        self.dcp_rebalance_in_offline_upgrade_from_version2()
2003        # Run the post_upgrade operations
2004        self._create_ephemeral_buckets()
2005        self.post_upgrade(self.servers[:6])
2006        # Add new services after the upgrade
2007        if after_upgrade_services_in is not False:
2008            for upgrade_version in self.upgrade_versions:
2009                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
2010                    upgrade_threads = self._async_update(upgrade_version, [self.servers[6]], save_upgrade_config=True)
2011                else:
2012                    upgrade_threads = self._async_update(upgrade_version, [self.servers[6]])
2013
2014                for upgrade_thread in upgrade_threads:
2015                    upgrade_thread.join()
2016                success_upgrade = True
2017                while not self.queue.empty():
2018                    success_upgrade &= self.queue.get()
2019                if not success_upgrade:
2020                    self.fail("Upgrade failed. See logs above!")
2021                self.sleep(120)
2022                self.cluster.rebalance(self.servers[:6], [self.servers[6]], [],
2023                                       services=["kv", "index", "n1ql"])
2024        # creating new buckets after upgrade
2025        if after_upgrade_buckets_in is not False:
2026            self.bucket_size = 100
2027            self._create_sasl_buckets(self.master, 1)
2028            self._create_standard_buckets(self.master, 1)
2029            if self.ddocs_num:
2030                self.create_ddocs_and_views()
2031                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
2032                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
2033        # deleting buckets after upgrade
2034        if after_upgrade_buckets_out is not False:
2035            self._all_buckets_delete(self.master)
2036        # flushing buckets after upgrade
2037        if after_upgrade_buckets_flush is not False:
2038            self._all_buckets_flush()
2039
2040    def test_offline_upgrade_kv_n1ql_service_in_new_version_with_multiple_nodes(self):
2041        after_upgrade_services_in = self.input.param("after_upgrade_services_in", False)
2042        after_upgrade_buckets_in = self.input.param("after_upgrade_buckets_in", False)
2043        after_upgrade_buckets_out = self.input.param("after_upgrade_buckets_out", False)
2044        after_upgrade_buckets_flush = self.input.param("after_upgrade_buckets_flush", False)
2045
2046        self.bucket_size = 1000
2047        # Install initial version on the specified nodes
2048        self._install(self.servers[:6])
2049        # Configure the nodes with services
2050        self.operations(self.servers[:6], services="kv,index,n1ql,index,kv,n1ql")
2051        # get the n1ql node which will be used in pre,during and post upgrade for running n1ql commands
2052        self.n1ql_server = self.get_nodes_from_services_map(
2053            service_type="n1ql")
2054        # Run the pre upgrade operations, typically creating index
2055        self.pre_upgrade(self.servers[:6])
2056        seqno_expected = 1
2057        if self.ddocs_num:
2058            self.create_ddocs_and_views()
2059            if self.input.param('run_index', False):
2060                self.verify_all_queries()
2061        if not self.initial_version.startswith("1.") and self.input.param('check_seqno', True):
2062            self.check_seqno(seqno_expected)
2063        if self.during_ops:
2064            for opn in self.during_ops:
2065                if opn != 'add_back_failover':
2066                    getattr(self, opn)()
2067        # upgrade n1ql and kv nodes to new version when we have multiple nodes
2068        upgrade_nodes = self.servers[4:6]
2069        for upgrade_version in self.upgrade_versions:
2070            self.sleep(self.sleep_time, "Pre-setup of old version is done. Wait for upgrade to {0} version". \
2071                       format(upgrade_version))
2072            for server in upgrade_nodes:
2073                remote = RemoteMachineShellConnection(server)
2074                remote.stop_server()
2075                self.sleep(self.sleep_time)
2076                if self.wait_expire:
2077                    self.sleep(self.expire_time)
2078                if self.input.param('remove_manifest_files', False):
2079                    for file in ['manifest.txt', 'manifest.xml', 'VERSION.txt,']:
2080                        output, error = remote.execute_command("rm -rf /opt/couchbase/{0}".format(file))
2081                        remote.log_command_output(output, error)
2082                if self.input.param('remove_config_files', False):
2083                    for file in ['config', 'couchbase-server.node', 'ip', 'couchbase-server.cookie']:
2084                        output, error = remote.execute_command(
2085                            "rm -rf /opt/couchbase/var/lib/couchbase/{0}".format(file))
2086                        remote.log_command_output(output, error)
2087                    self.buckets = []
2088                remote.disconnect()
2089
2090            if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
2091                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes, save_upgrade_config=True)
2092            else:
2093                upgrade_threads = self._async_update(upgrade_version, upgrade_nodes)
2094            # wait upgrade statuses
2095            self.n1ql_server = self.get_nodes_from_services_map(
2096                service_type="n1ql")
2097            self.during_upgrade(self.servers[:6])
2098            for upgrade_thread in upgrade_threads:
2099                upgrade_thread.join()
2100            success_upgrade = True
2101            while not self.queue.empty():
2102                success_upgrade &= self.queue.get()
2103            if not success_upgrade:
2104                self.fail("Upgrade failed. See logs above!")
2105            self.sleep(self.expire_time)
2106        self.dcp_rebalance_in_offline_upgrade_from_version2()
2107        # Run the post_upgrade operations
2108        self._create_ephemeral_buckets()
2109        self.post_upgrade(self.servers[:6])
2110        # Add new services after the upgrade
2111        if after_upgrade_services_in is not False:
2112            for upgrade_version in self.upgrade_versions:
2113                if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
2114                    upgrade_threads = self._async_update(upgrade_version, [self.servers[6]], save_upgrade_config=True)
2115                else:
2116                    upgrade_threads = self._async_update(upgrade_version, [self.servers[6]])
2117
2118                for upgrade_thread in upgrade_threads:
2119                    upgrade_thread.join()
2120                success_upgrade = True
2121                while not self.queue.empty():
2122                    success_upgrade &= self.queue.get()
2123                if not success_upgrade:
2124                    self.fail("Upgrade failed. See logs above!")
2125                self.sleep(120)
2126                self.cluster.rebalance(self.servers[:6], [self.servers[6]], [],
2127                                       services=["kv", "index", "n1ql"])
2128        # creating new buckets after upgrade
2129        if after_upgrade_buckets_in is not False:
2130            self.bucket_size = 100
2131            self._create_sasl_buckets(self.master, 1)
2132            self._create_standard_buckets(self.master, 1)
2133            if self.ddocs_num:
2134                self.create_ddocs_and_views()
2135                gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
2136                self._load_all_buckets(self.master, gen_load, "create", self.expire_time, flag=self.item_flag)
2137        # deleting buckets after upgrade
2138        if after_upgrade_buckets_out is not False:
2139            self._all_buckets_delete(self.master)
2140        # flushing buckets after upgrade
2141        if after_upgrade_buckets_flush is not False:
2142            self._all_buckets_f