1import copy, json
2import re
3import testconstants
4import gc
5import sys
6import traceback
7import Queue
8from threading import Thread
9from mc_bin_client import MemcachedError
10from memcached.helper.data_helper import VBucketAwareMemcached, MemcachedClientHelper
11from membase.helper.bucket_helper import BucketOperationHelper
12from membase.api.rest_client import RestConnection, RestHelper, Bucket
13from membase.helper.cluster_helper import ClusterOperationHelper
14from remote.remote_util import RemoteMachineShellConnection, RemoteUtilHelper
15from couchbase_helper.document import DesignDocument, View
16from couchbase_helper.documentgenerator import BlobGenerator
17from query_tests_helper import QueryHelperTests
18from couchbase_helper.tuq_helper import N1QLHelper
19from scripts.install import InstallerJob
20from builds.build_query import BuildQuery
21from eventing.eventing_base import EventingBaseTest
22from pytests.eventing.eventing_constants import HANDLER_CODE
23from random import randrange, randint
24from fts.fts_base import FTSIndex, FTSBaseTest
25from pytests.fts.fts_callable import FTSCallable
26from cbas.cbas_base import CBASBaseTest
27from pprint import pprint
28from testconstants import CB_REPO, INDEX_QUOTA
29from testconstants import MV_LATESTBUILD_REPO
30from testconstants import SHERLOCK_BUILD_REPO
31from testconstants import COUCHBASE_VERSION_2
32from testconstants import COUCHBASE_VERSION_3
33from testconstants import COUCHBASE_VERSIONS
34from testconstants import SHERLOCK_VERSION
35from testconstants import CB_VERSION_NAME
36from testconstants import COUCHBASE_MP_VERSION
37from testconstants import CE_EE_ON_SAME_FOLDER
38from testconstants import STANDARD_BUCKET_PORT
39
40class NewUpgradeBaseTest(QueryHelperTests,EventingBaseTest, FTSBaseTest):
41    def setUp(self):
42        super(NewUpgradeBaseTest, self).setUp()
43        self.released_versions = ["2.0.0-1976-rel", "2.0.1", "2.5.0", "2.5.1",
44                                  "2.5.2", "3.0.0", "3.0.1",
45                                  "3.0.1-1444", "3.0.2", "3.0.2-1603", "3.0.3",
46                                  "3.1.0", "3.1.0-1776", "3.1.1", "3.1.1-1807",
47                                  "3.1.2", "3.1.2-1815", "3.1.3", "3.1.3-1823",
48                                  "4.0.0", "4.0.0-4051", "4.1.0", "4.1.0-5005"]
49        self.use_hostnames = self.input.param("use_hostnames", False)
50        self.product = self.input.param('product', 'couchbase-server')
51        self.initial_version = self.input.param('initial_version', '2.5.1-1083')
52        self.initial_vbuckets = self.input.param('initial_vbuckets', 1024)
53        self.upgrade_versions = self.input.param('upgrade_version', '4.1.0-4963')
54        self.upgrade_versions = self.upgrade_versions.split(";")
55        self.skip_cleanup = self.input.param("skip_cleanup", False)
56        self.init_nodes = self.input.param('init_nodes', True)
57
58        self.is_downgrade = self.input.param('downgrade', False)
59        if self.is_downgrade:
60            self.initial_version, self.upgrade_versions = self.upgrade_versions[0], [self.initial_version]
61
62        upgrade_path = self.input.param('upgrade_path', [])
63        if upgrade_path:
64            upgrade_path = upgrade_path.split(",")
65        self.upgrade_versions = upgrade_path + self.upgrade_versions
66        if self.input.param('released_upgrade_version', None) is not None:
67            self.upgrade_versions = [self.input.param('released_upgrade_version', None)]
68
69        self.initial_build_type = self.input.param('initial_build_type', None)
70        self.upgrade_build_type = self.input.param('upgrade_build_type', self.initial_build_type)
71        self.stop_persistence = self.input.param('stop_persistence', False)
72        self.rest_settings = self.input.membase_settings
73        self.rest = None
74        self.rest_helper = None
75        self.is_ubuntu = False
76        self.is_rpm = False
77        self.is_centos7 = False
78        self.sleep_time = 15
79        self.ddocs = []
80        self.item_flag = self.input.param('item_flag', 0)
81        self.expire_time = self.input.param('expire_time', 0)
82        self.wait_expire = self.input.param('wait_expire', False)
83        self.default_view_name = "upgrade-test-view"
84        self.ddocs_num = self.input.param("ddocs-num", 0)
85        self.view_num = self.input.param("view-per-ddoc", 2)
86        self.is_dev_ddoc = self.input.param("is-dev-ddoc", False)
87        self.offline_failover_upgrade = self.input.param("offline_failover_upgrade", False)
88        self.eventing_log_level = self.input.param('eventing_log_level', 'INFO')
89        self.src_bucket_name = self.input.param('src_bucket_name', 'src_bucket')
90        self.dst_bucket_name = self.input.param('dst_bucket_name', 'dst_bucket')
91        self.dst_bucket_name1 = self.input.param('dst_bucket_name1', 'dst_bucket1')
92        self.metadata_bucket_name = self.input.param('metadata_bucket_name', 'metadata')
93        self.cb_bucket_name = self.input.param('cb_bucket_name', 'travel-sample')
94        self.cb_server_ip = self.input.param("cb_server_ip", None)
95        self.cbas_bucket_name = self.input.param('cbas_bucket_name', 'travel')
96        self.cbas_dataset_name = self.input.param("cbas_dataset_name", 'travel_ds')
97        self.cbas_dataset_name_invalid = self.input.param('cbas_dataset_name_invalid',
98                                                                self.cbas_dataset_name)
99        self.cbas_bucket_name_invalid = self.input.param('cbas_bucket_name_invalid',
100                                                                 self.cbas_bucket_name)
101        self.use_memory_manager = self.input.param('use_memory_manager', True)
102        self.is_fts_in_pre_upgrade = self.input.param('is_fts_in_pre_upgrade', False)
103        self.num_index_replicas = self.input.param("num_index_replica", 0)
104        self.expected_err_msg = self.input.param("expected_err_msg", None)
105        self.during_ops = None
106        if "during-ops" in self.input.test_params:
107            self.during_ops = self.input.param("during-ops", None).split(",")
108        if self.initial_version.startswith("1.6") or self.initial_version.startswith("1.7"):
109            self.product = 'membase-server'
110        else:
111            self.product = 'couchbase-server'
112        if self.max_verify is None:
113            self.max_verify = min(self.num_items, 100000)
114        shell = RemoteMachineShellConnection(self.master)
115        type = shell.extract_remote_info().distribution_type
116        os_version = shell.extract_remote_info().distribution_version
117        shell.disconnect()
118        if type.lower() == 'windows':
119            self.is_linux = False
120        else:
121            self.is_linux = True
122        if type.lower() == "ubuntu":
123            self.is_ubuntu = True
124        if type.lower() == "centos":
125            self.is_rpm = True
126            if os_version.lower() == "centos 7":
127                self.is_centos7 = True
128        self.queue = Queue.Queue()
129        self.upgrade_servers = []
130        if self.initial_build_type == "community" and self.upgrade_build_type == "enterprise":
131            if self.initial_version != self.upgrade_versions:
132                self.log.warn(
133                    "we can't upgrade from couchbase CE to EE with a different version,defaulting to initial_version")
134                self.log.warn("http://developer.couchbase.com/documentation/server/4.0/install/upgrading.html")
135                self.upgrade_versions = self.input.param('initial_version', '4.1.0-4963')
136                self.upgrade_versions = self.upgrade_versions.split(";")
137        self.fts_obj = None
138        self.n1ql_helper = None
139        self.index_name_prefix = None
140
141    def tearDown(self):
142        test_failed = (hasattr(self, '_resultForDoCleanups') and \
143                       len(self._resultForDoCleanups.failures or \
144                           self._resultForDoCleanups.errors)) or \
145                                 (hasattr(self, '_exc_info') and \
146                                  self._exc_info()[1] is not None)
147        if test_failed and self.skip_cleanup:
148                self.log.warn("CLEANUP WAS SKIPPED DUE TO FAILURES IN UPGRADE TEST")
149                self.cluster.shutdown(force=True)
150                self.log.info("Test Input params were:")
151                pprint(self.input.test_params)
152
153                if self.input.param('BUGS', False):
154                    self.log.warn("Test failed. Possible reason is: {0}"
155                                           .format(self.input.param('BUGS', False)))
156        else:
157            if not hasattr(self, 'rest'):
158                return
159            try:
160                # cleanup only nodes that are in cluster
161                # not all servers have been installed
162                if self.rest is None:
163                    self._new_master(self.master)
164                nodes = self.rest.get_nodes()
165                temp = []
166                for server in self.servers:
167                    if server.ip in [node.ip for node in nodes]:
168                        temp.append(server)
169                self.servers = temp
170            except Exception, e:
171                if e:
172                    print "Exception ", e
173                self.cluster.shutdown(force=True)
174                self.fail(e)
175            super(NewUpgradeBaseTest, self).tearDown()
176            if self.upgrade_servers:
177                self._install(self.upgrade_servers,version=self.initial_version)
178        self.sleep(20, "sleep 20 seconds before run next test")
179
180    def _install(self, servers, version=None, community_to_enterprise=False):
181        params = {}
182        params['num_nodes'] = len(servers)
183        params['product'] = self.product
184        params['version'] = self.initial_version
185        params['vbuckets'] = [self.initial_vbuckets]
186        params['init_nodes'] = self.init_nodes
187        if 5 <= int(self.initial_version[:1]) or 5 <= int(self.upgrade_versions[0][:1]):
188            params['fts_query_limit'] = 10000000
189        if version:
190            params['version'] = version
191        if self.initial_build_type is not None:
192            params['type'] = self.initial_build_type
193        if community_to_enterprise:
194            params['type'] = self.upgrade_build_type
195        self.log.info("will install {0} on {1}".format(params['version'], [s.ip for s in servers]))
196        InstallerJob().parallel_install(servers, params)
197        self.add_built_in_server_user()
198        if self.product in ["couchbase", "couchbase-server", "cb"]:
199            success = True
200            for server in servers:
201                shell = RemoteMachineShellConnection(server)
202                info = shell.extract_remote_info()
203                success &= shell.is_couchbase_installed()
204                self.sleep(5, "sleep 5 seconds to let cb up completely")
205                ready = RestHelper(RestConnection(server)).is_ns_server_running(60)
206                if not ready:
207                    if "cento 7" in info.distribution_version.lower():
208                        self.log.info("run systemctl daemon-reload")
209                        shell.execute_command("systemctl daemon-reload", debug=False)
210                        shell.start_server()
211                    else:
212                        log.error("Couchbase-server did not start...")
213                shell.disconnect()
214                if not success:
215                    sys.exit("some nodes were not install successfully!")
216        if self.rest is None:
217            self._new_master(self.master)
218        if self.use_hostnames:
219            for server in self.servers[:self.nodes_init]:
220                hostname = RemoteUtilHelper.use_hostname_for_server_settings(server)
221                server.hostname = hostname
222
223    def operations(self, servers, services=None):
224        if services is not None:
225            if "-" in services:
226                set_services = services.split("-")
227            else:
228                set_services = services.split(",")
229        else:
230            set_services = services
231
232        if 4.5 > float(self.initial_version[:3]):
233            self.gsi_type = "forestdb"
234        for service in set_services:
235            if "index" in service:
236                self.log.info("set index quota to {0}".format(INDEX_QUOTA))
237                RestConnection(servers[0]).set_service_memoryQuota(service='indexMemoryQuota',
238                                                                   memoryQuota=INDEX_QUOTA)
239                break
240        self.quota = self._initialize_nodes(self.cluster, servers,
241                                            self.disabled_consistent_view,
242                                            self.rebalanceIndexWaitingDisabled,
243                                            self.rebalanceIndexPausingDisabled,
244                                            self.maxParallelIndexers,
245                                            self.maxParallelReplicaIndexers, self.port,
246                                            services=set_services)
247        if self.port and self.port != '8091':
248            self.rest = RestConnection(self.master)
249            self.rest_helper = RestHelper(self.rest)
250        if 5.0 <= float(self.initial_version[:3]):
251            self.add_built_in_server_user()
252        self.sleep(20, "wait to make sure node is ready")
253        if len(servers) > 1:
254            if services is None:
255                self.cluster.rebalance([servers[0]], servers[1:], [],
256                                       use_hostnames=self.use_hostnames)
257            else:
258                for i in range(1, len(set_services)):
259                    self.cluster.rebalance([servers[0]], [servers[i]], [],
260                                           use_hostnames=self.use_hostnames,
261                                           services=[set_services[i]])
262                    self.sleep(10)
263
264        self.buckets = []
265        gc.collect()
266        if self.input.param('extra_verification', False):
267            self.total_buckets += 2
268#        if not self.total_buckets:
269#            self.total_buckets = 1
270        self.bucket_size = self._get_bucket_size(self.quota, self.total_buckets)
271        if self.dgm_run:
272            self.bucket_size = 256
273        self._bucket_creation()
274        if self.stop_persistence:
275            for server in servers:
276                for bucket in self.buckets:
277                    client = MemcachedClientHelper.direct_client(server, bucket)
278                    client.stop_persistence()
279            self.sleep(10)
280        gen_load = BlobGenerator('upgrade', 'upgrade-', self.value_size, end=self.num_items)
281        if self.is_fts_in_pre_upgrade:
282            self.create_fts_index_query_compare()
283        else:
284            self._load_all_buckets(self.master, gen_load, "create", self.expire_time,
285                                                             flag=self.item_flag)
286        if not self.stop_persistence:
287            self._wait_for_stats_all_buckets(servers)
288        else:
289            for bucket in self.buckets:
290                drain_rate = 0
291                for server in servers:
292                    client = MemcachedClientHelper.direct_client(server, bucket)
293                    drain_rate += int(client.stats()["ep_queue_size"])
294                self.sleep(3, "Pause to load all items")
295                self.assertEqual(self.num_items * (self.num_replicas + 1), drain_rate,
296                    "Persistence is stopped, drain rate is incorrect %s. Expected %s" % (
297                                    drain_rate, self.num_items * (self.num_replicas + 1)))
298        self.change_settings()
299
300    def _get_build(self, server, version, remote, is_amazon=False, info=None):
301        if info is None:
302            info = remote.extract_remote_info()
303        build_repo = CB_REPO
304        if version[:5] in COUCHBASE_VERSIONS:
305            if version[:3] in CB_VERSION_NAME:
306                build_repo = CB_REPO + CB_VERSION_NAME[version[:3]] + "/"
307            elif version[:5] in COUCHBASE_MP_VERSION:
308                build_repo = MV_LATESTBUILD_REPO
309
310        if self.upgrade_build_type == "community":
311            edition_type = "couchbase-server-community"
312        else:
313            edition_type = "couchbase-server-enterprise"
314
315        builds, changes = BuildQuery().get_all_builds(version=version, timeout=self.wait_timeout * 5, \
316                                                      deliverable_type=info.deliverable_type,
317                                                      architecture_type=info.architecture_type, \
318                                                      edition_type=edition_type, repo=build_repo, \
319                                                      distribution_version=info.distribution_version.lower())
320
321        self.log.info("finding build %s for machine %s" % (version, server))
322
323        if re.match(r'[1-9].[0-9].[0-9]-[0-9]+$', version):
324            version = version + "-rel"
325        if version[:5] in self.released_versions:
326            appropriate_build = BuildQuery().\
327                find_couchbase_release_build('%s-enterprise' % (self.product),
328                                           info.deliverable_type,
329                                           info.architecture_type,
330                                           version.strip(),
331                                           is_amazon=is_amazon,
332                                           os_version=info.distribution_version)
333        else:
334             appropriate_build = BuildQuery().\
335                find_build(builds, '%s-enterprise' % (self.product), info.deliverable_type,
336                                   info.architecture_type, version.strip())
337
338        if appropriate_build is None:
339            self.log.info("builds are: %s \n. Remote is %s, %s. Result is: %s" % (builds, remote.ip, remote.username, version))
340            raise Exception("Build %s for machine %s is not found" % (version, server))
341        return appropriate_build
342
343    def _upgrade(self, upgrade_version, server, queue=None, skip_init=False, info=None,
344                       save_upgrade_config=False, fts_query_limit=None, debug_logs=False):
345        try:
346            remote = RemoteMachineShellConnection(server)
347            appropriate_build = self._get_build(server, upgrade_version, remote, info=info)
348            self.assertTrue(appropriate_build.url,
349                            msg="unable to find build {0}".format(upgrade_version))
350            self.assertTrue(remote.download_build(appropriate_build),
351                                          "Build wasn't downloaded!")
352            o, e = remote.couchbase_upgrade(appropriate_build,\
353                                            save_upgrade_config=save_upgrade_config,\
354                                            forcefully=self.is_downgrade,
355                                            fts_query_limit=fts_query_limit, debug_logs=debug_logs)
356            self.log.info("upgrade {0} to version {1} is completed".format(server.ip, upgrade_version))
357            """ remove this line when bug MB-11807 fixed """
358            if self.is_ubuntu:
359                remote.start_server()
360            """ remove end here """
361            if 5.0 > float(self.initial_version[:3]) and self.is_centos7:
362                remote.execute_command("systemctl daemon-reload")
363                remote.start_server()
364            self.rest = RestConnection(server)
365            if self.is_linux:
366                self.wait_node_restarted(server, wait_time=testconstants.NS_SERVER_TIMEOUT * 4, wait_if_warmup=True)
367            else:
368                self.wait_node_restarted(server, wait_time=testconstants.NS_SERVER_TIMEOUT * 10, wait_if_warmup=True, check_service=True)
369            if not skip_init:
370                self.rest.init_cluster(self.rest_settings.rest_username, self.rest_settings.rest_password)
371            self.sleep(self.sleep_time)
372            remote.disconnect()
373            self.sleep(10)
374            return o, e
375        except Exception, e:
376            print traceback.extract_stack()
377            if queue is not None:
378                queue.put(False)
379                if not self.is_linux:
380                    remote = RemoteMachineShellConnection(server)
381                    output, error = remote.execute_command("cmd /c schtasks /Query /FO LIST /TN removeme /V")
382                    remote.log_command_output(output, error)
383                    output, error = remote.execute_command("cmd /c schtasks /Query /FO LIST /TN installme /V")
384                    remote.log_command_output(output, error)
385                    output, error = remote.execute_command("cmd /c schtasks /Query /FO LIST /TN upgrademe /V")
386                    remote.log_command_output(output, error)
387                    remote.disconnect()
388                raise e
389        if queue is not None:
390            queue.put(True)
391
392    def _async_update(self, upgrade_version, servers, queue=None, skip_init=False,
393                      info=None, save_upgrade_config=False,
394                      fts_query_limit=None, debug_logs=False):
395        self.log.info("servers {0} will be upgraded to {1} version".
396                      format([server.ip for server in servers], upgrade_version))
397        q = queue or self.queue
398        upgrade_threads = []
399        for server in servers:
400            upgrade_thread = Thread(target=self._upgrade,
401                                    name="upgrade_thread" + server.ip,
402                                    args=(upgrade_version, server, q, skip_init, info,
403                                          save_upgrade_config, fts_query_limit,
404                                          debug_logs))
405            upgrade_threads.append(upgrade_thread)
406            upgrade_thread.start()
407        return upgrade_threads
408
409    def _new_master(self, server):
410        self.master = server
411        self.rest = RestConnection(self.master)
412        self.rest_helper = RestHelper(self.rest)
413
414    def verification(self, servers, check_items=True):
415        if self.use_hostnames:
416            for server in servers:
417                node_info = RestConnection(server).get_nodes_self()
418                new_hostname = node_info.hostname
419                self.assertEqual("%s:%s" % (server.hostname, server.port), new_hostname,
420                                 "Hostname is incorrect for server %s. Settings are %s" % (server.ip, new_hostname))
421        if self.master.ip != self.rest.ip or \
422           self.master.ip == self.rest.ip and str(self.master.port) != str(self.rest.port):
423            if self.port:
424                self.master.port = self.port
425            self.rest = RestConnection(self.master)
426            self.rest_helper = RestHelper(self.rest)
427        if self.port and self.port != '8091':
428            settings = self.rest.get_cluster_settings()
429            if settings and 'port' in settings:
430                self.assertTrue(self.port == str(settings['port']),
431                                'Expected cluster port is %s, but is %s' % (self.port, settings['port']))
432        for bucket in self.buckets:
433            if not self.rest_helper.bucket_exists(bucket.name):
434                raise Exception("bucket: %s not found" % bucket.name)
435        self.verify_cluster_stats(servers, max_verify=self.max_verify, \
436                                  timeout=self.wait_timeout * 20, check_items=check_items)
437
438        if self.ddocs:
439            self.verify_all_queries()
440        if "update_notifications" in self.input.test_params:
441            if self.rest.get_notifications() != self.input.param("update_notifications", True):
442                self.fail("update notifications settings wasn't saved")
443        if "autofailover_timeout" in self.input.test_params:
444            if self.rest.get_autofailover_settings().timeout != self.input.param("autofailover_timeout", None):
445                self.fail("autofailover settings wasn't saved")
446        if "autofailover_alerts" in self.input.test_params:
447            alerts = self.rest.get_alerts_settings()
448            if alerts["recipients"] != ['couchbase@localhost']:
449                self.fail("recipients value wasn't saved")
450            if alerts["sender"] != 'root@localhost':
451                self.fail("sender value wasn't saved")
452            if alerts["emailServer"]["user"] != 'user':
453                self.fail("email_username value wasn't saved")
454            if alerts["emailServer"]["pass"] != '':
455                self.fail("email_password should be empty for security")
456        if "autocompaction" in self.input.test_params:
457            cluster_status = self.rest.cluster_status()
458            if cluster_status["autoCompactionSettings"]["viewFragmentationThreshold"]\
459                             ["percentage"] != self.input.param("autocompaction", 50):
460                    self.log.info("Cluster status: {0}".format(cluster_status))
461                    self.fail("autocompaction settings weren't saved")
462
463    def verify_all_queries(self):
464        query = {"connectionTimeout" : 60000}
465        expected_rows = self.num_items
466        if self.max_verify:
467            expected_rows = self.max_verify
468            query["limit"] = expected_rows
469        if self.input.param("wait_expiration", False):
470            expected_rows = 0
471        for bucket in self.buckets:
472            for ddoc in self.ddocs:
473                prefix = ("", "dev_")[ddoc.views[0].dev_view]
474                self.perform_verify_queries(len(ddoc.views), prefix, ddoc.name, query, bucket=bucket,
475                                           wait_time=self.wait_timeout * 5, expected_rows=expected_rows,
476                                           retry_time=10)
477
478    def change_settings(self):
479        status = True
480        if "update_notifications" in self.input.test_params:
481            status &= self.rest.update_notifications(str(self.input.param("update_notifications", 'true')).lower())
482        if "autofailover_timeout" in self.input.test_params:
483            status &= self.rest.update_autofailover_settings(True, self.input.param("autofailover_timeout", None))
484        if "autofailover_alerts" in self.input.test_params:
485            status &= self.rest.set_alerts_settings('couchbase@localhost', 'root@localhost', 'user', 'pwd')
486        if "autocompaction" in self.input.test_params:
487            tmp, _, _ = self.rest.set_auto_compaction(viewFragmntThresholdPercentage=
488                                     self.input.param("autocompaction", 50))
489            status &= tmp
490            if not status:
491                self.fail("some settings were not set correctly!")
492
493    def warm_up_node(self, warmup_nodes=None):
494        if not warmup_nodes:
495            warmup_nodes = [self.servers[:self.nodes_init][-1], ]
496        for warmup_node in warmup_nodes:
497            shell = RemoteMachineShellConnection(warmup_node)
498            shell.stop_couchbase()
499            shell.disconnect()
500        self.sleep(20)
501        for warmup_node in warmup_nodes:
502            shell = RemoteMachineShellConnection(warmup_node)
503            shell.start_couchbase()
504            shell.disconnect()
505        ClusterOperationHelper.wait_for_ns_servers_or_assert(warmup_nodes, self)
506
507    def start_index(self):
508        if self.ddocs:
509            query = {"connectionTimeout" : 60000}
510            for bucket in self.buckets:
511                for ddoc in self.ddocs:
512                    prefix = ("", "dev_")[ddoc.views[0].dev_view]
513                    self.perform_verify_queries(len(ddoc.views), prefix, ddoc.name, query, bucket=bucket)
514
515    def failover(self):
516        rest = RestConnection(self.master)
517        nodes = rest.node_statuses()
518        nodes = [node for node in nodes
519                if node.ip != self.master.ip or str(node.port) != self.master.port]
520        self.failover_node = nodes[0]
521        rest.fail_over(self.failover_node.id)
522
523    def add_back_failover(self):
524        rest = RestConnection(self.master)
525        rest.add_back_node(self.failover_node.id)
526
527    def create_ddocs_and_views(self, server=None):
528        server_in_cluster = self.master
529        if server is not None:
530            self.buckets = RestConnection(server).get_buckets()
531            server_in_cluster = server
532        self.default_view = View(self.default_view_name, None, None)
533        for bucket in self.buckets:
534            for i in xrange(int(self.ddocs_num)):
535                views = self.make_default_views(self.default_view_name, self.view_num,
536                                               self.is_dev_ddoc, different_map=True)
537                ddoc = DesignDocument(self.default_view_name + str(i), views)
538                self.ddocs.append(ddoc)
539                for view in views:
540                    self.cluster.create_view(server_in_cluster, ddoc.name,
541                                             view, bucket=bucket)
542
543    def delete_data(self, servers, paths_to_delete):
544        for server in servers:
545            shell = RemoteMachineShellConnection(server)
546            for path in paths_to_delete:
547                output, error = shell.execute_command("rm -rf {0}".format(path))
548                shell.log_command_output(output, error)
549                # shell._ssh_client.open_sftp().rmdir(path)
550            shell.disconnect()
551
552    def check_seqno(self, seqno_expected, comparator='=='):
553        for bucket in self.buckets:
554            if bucket.type == 'memcached':
555                continue
556            ready = BucketOperationHelper.wait_for_memcached(self.master,
557                                                          bucket.name)
558            self.assertTrue(ready, "wait_for_memcached failed")
559            client = VBucketAwareMemcached(RestConnection(self.master), bucket)
560            valid_keys, deleted_keys = bucket.kvs[1].key_set()
561            for valid_key in valid_keys:
562                try:
563                    _, flags, exp, seqno, cas = client.memcached(valid_key).getMeta(valid_key)
564                except MemcachedError, e:
565                    print e
566                    client.reset(RestConnection(self.master))
567                    _, flags, exp, seqno, cas = client.memcached(valid_key).getMeta(valid_key)
568                self.assertTrue((comparator == '==' and seqno == seqno_expected) or
569                                (comparator == '>=' and seqno >= seqno_expected),
570                                msg="seqno {0} !{1} {2} for key:{3}".
571                                format(seqno, comparator, seqno_expected, valid_key))
572            client.done()
573
574    def force_reinstall(self, servers):
575        for server in servers:
576            try:
577                remote = RemoteMachineShellConnection(server)
578                appropriate_build = self._get_build(server, self.initial_version, remote)
579                self.assertTrue(appropriate_build.url, msg="unable to find build {0}".format(self.initial_version))
580                remote.download_build(appropriate_build)
581                remote.install_server(appropriate_build, force=True)
582                self.log.info("upgrade {0} to version {1} is completed".format(server.ip, self.initial_version))
583                remote.disconnect()
584                self.sleep(10)
585                if self.is_linux:
586                    self.wait_node_restarted(server, wait_time=testconstants.NS_SERVER_TIMEOUT * 4, wait_if_warmup=True)
587                else:
588                    self.wait_node_restarted(server, wait_time=testconstants.NS_SERVER_TIMEOUT * 10, wait_if_warmup=True, check_service=True)
589            except Exception, e:
590                print traceback.extract_stack()
591                if queue is not None:
592                    queue.put(False)
593                    if not self.is_linux:
594                        remote = RemoteMachineShellConnection(server)
595                        output, error = remote.execute_command("cmd /c schtasks /Query /FO LIST /TN installme /V")
596                        remote.log_command_output(output, error)
597                        remote.disconnect()
598                    raise e
599
600    def _verify_vbucket_nums_for_swap(self, old_vbs, new_vbs):
601        out_servers = set(old_vbs) - set(new_vbs)
602        in_servers = set(new_vbs) - set(old_vbs)
603        self.assertEqual(len(out_servers), len(in_servers),
604                        "Seems like it wasn't swap rebalance. Out %s, in %s" % (
605                                                len(out_servers),len(in_servers)))
606        for vb_type in ["active_vb", "replica_vb"]:
607            self.log.info("Checking %s on nodes that remain in cluster..." % vb_type)
608            for server, stats in old_vbs.iteritems():
609                if server in new_vbs:
610                    self.assertTrue(sorted(stats[vb_type]) == sorted(new_vbs[server][vb_type]),
611                    "Server %s Seems like %s vbuckets were shuffled, old vbs is %s, new are %s" %(
612                                    server.ip, vb_type, stats[vb_type], new_vbs[server][vb_type]))
613            self.log.info("%s vbuckets were not suffled" % vb_type)
614            self.log.info("Checking in-out nodes...")
615            vbs_servs_out = vbs_servs_in = []
616            for srv, stat in old_vbs.iteritems():
617                if srv in out_servers:
618                    vbs_servs_out.extend(stat[vb_type])
619            for srv, stat in new_vbs.iteritems():
620                if srv in in_servers:
621                    vbs_servs_in.extend(stat[vb_type])
622            self.assertTrue(sorted(vbs_servs_out) == sorted(vbs_servs_in),
623                            "%s vbuckets seem to be suffled" % vb_type)
624
625    def monitor_dcp_rebalance(self):
626        if self.input.param('initial_version', '')[:5] in COUCHBASE_VERSION_2 and \
627           (self.input.param('upgrade_version', '')[:5] in COUCHBASE_VERSION_3 or \
628            self.input.param('upgrade_version', '')[:5] in SHERLOCK_VERSION):
629            if int(self.initial_vbuckets) >= 256:
630                if self.master.ip != self.rest.ip or \
631                   self.master.ip == self.rest.ip and \
632                   str(self.master.port) != str(self.rest.port):
633                    if self.port:
634                        self.master.port = self.port
635                    self.rest = RestConnection(self.master)
636                    self.rest_helper = RestHelper(self.rest)
637                if self.rest._rebalance_progress_status() == 'running':
638                    self.log.info("Start monitoring DCP upgrade from {0} to {1}"\
639                           .format(self.input.param('initial_version', '')[:5], \
640                                    self.input.param('upgrade_version', '')[:5]))
641                    status = self.rest.monitorRebalance()
642                    if status:
643                        self.log.info("Done DCP rebalance upgrade!")
644                    else:
645                        self.fail("Failed DCP rebalance upgrade")
646                elif self.sleep(5) is None and any ("DCP upgrade completed successfully." \
647                                    in d.values() for d in self.rest.get_logs(10)):
648                    self.log.info("DCP upgrade is completed")
649                else:
650                    self.fail("DCP reabalance upgrade is not running")
651            else:
652                self.fail("Need vbuckets setting >= 256 for upgrade from 2.x.x to 3+")
653        else:
654            if self.master.ip != self.rest.ip:
655                self.rest = RestConnection(self.master)
656                self.rest_helper = RestHelper(self.rest)
657            self.log.info("No need to do DCP rebalance upgrade")
658
659    def dcp_rebalance_in_offline_upgrade_from_version2(self):
660        if self.input.param('initial_version', '')[:5] in COUCHBASE_VERSION_2 and \
661           (self.input.param('upgrade_version', '')[:5] in COUCHBASE_VERSION_3 or \
662            self.input.param('upgrade_version', '')[:5] in SHERLOCK_VERSION) and \
663            self.input.param('num_stoped_nodes', self.nodes_init) >= self.nodes_init:
664            otpNodes = []
665            nodes = self.rest.node_statuses()
666            for node in nodes:
667                otpNodes.append(node.id)
668            self.log.info("Start DCP rebalance after complete offline upgrade from {0} to {1}"\
669                           .format(self.input.param('initial_version', '')[:5], \
670                                   self.input.param('upgrade_version', '')[:5]))
671            self.rest.rebalance(otpNodes, [])
672            """ verify DCP upgrade in 3.0.0 version """
673            self.monitor_dcp_rebalance()
674        else:
675            self.log.info("No need to do DCP rebalance upgrade")
676
677    def pre_upgrade(self, servers):
678        if self.rest is None:
679            self._new_master(self.master)
680        self.ddocs_num = 0
681        self.create_ddocs_and_views()
682        verify_data = False
683        if self.scan_consistency != "request_plus":
684            verify_data = True
685        self.load(self.gens_load, flag=self.item_flag,
686                  verify_data=verify_data, batch_size=self.batch_size)
687        rest = RestConnection(servers[0])
688        output, rq_content, header = rest.set_auto_compaction(dbFragmentThresholdPercentage=20, viewFragmntThresholdPercentage=20)
689        self.assertTrue(output, "Error in set_auto_compaction... {0}".format(rq_content))
690        status, content, header = rest.set_indexer_compaction(mode="full", fragmentation=20)
691        self.assertTrue(status, "Error in setting Append Only Compaction... {0}".format(content))
692        operation_type = self.input.param("pre_upgrade", "")
693        self.run_async_index_operations(operation_type)
694
695    def during_upgrade(self, servers):
696        self.ddocs_num = 0
697        self.create_ddocs_and_views()
698        kv_tasks = self.async_run_doc_ops()
699        operation_type = self.input.param("during_upgrade", "")
700        self.run_async_index_operations(operation_type)
701        for task in kv_tasks:
702            task.result()
703
704    def post_upgrade(self, servers):
705        self.log.info(" Doing post upgrade")
706        self.ddocs_num = 0
707        self.add_built_in_server_user()
708        self.create_ddocs_and_views(servers[0])
709        kv_tasks = self.async_run_doc_ops()
710        operation_type = self.input.param("post_upgrade", "")
711        self.run_async_index_operations(operation_type)
712        for task in kv_tasks:
713            task.result()
714        self.verification(servers,check_items=False)
715
716    def _create_ephemeral_buckets(self):
717        create_ephemeral_buckets = self.input.param(
718            "create_ephemeral_buckets", False)
719        if not create_ephemeral_buckets:
720            return
721        rest = RestConnection(self.master)
722        versions = rest.get_nodes_versions()
723        for version in versions:
724            if "5" > version:
725                self.log.info("Atleast one of the nodes in the cluster is "
726                              "pre 5.0 version. Hence not creating ephemeral"
727                              "bucket for the cluster.")
728                return
729        num_ephemeral_bucket = self.input.param("num_ephemeral_bucket", 1)
730        server = self.master
731        server_id = RestConnection(server).get_nodes_self().id
732        ram_size = RestConnection(server).get_nodes_self().memoryQuota
733        bucket_size = self._get_bucket_size(ram_size, self.bucket_size +
734                                                      num_ephemeral_bucket)
735        self.log.info("Creating ephemeral buckets")
736        self.log.info("Changing the existing buckets size to accomodate new "
737                      "buckets")
738        for bucket in self.buckets:
739            rest.change_bucket_props(bucket, ramQuotaMB=bucket_size)
740
741        bucket_tasks = []
742        bucket_params = copy.deepcopy(
743            self.bucket_base_params['membase']['non_ephemeral'])
744        bucket_params['size'] = bucket_size
745        bucket_params['bucket_type'] = 'ephemeral'
746        bucket_params['eviction_policy'] = 'noEviction'
747        ephemeral_buckets = []
748        self.log.info("Creating ephemeral buckets now")
749        for i in range(num_ephemeral_bucket):
750            name = 'ephemeral_bucket' + str(i)
751            port = STANDARD_BUCKET_PORT + i + 1
752            bucket_priority = None
753            if self.standard_bucket_priority is not None:
754                bucket_priority = self.get_bucket_priority(
755                    self.standard_bucket_priority[i])
756
757            bucket_params['bucket_priority'] = bucket_priority
758            bucket_tasks.append(
759                self.cluster.async_create_standard_bucket(name=name, port=port,
760                                                          bucket_params=bucket_params))
761            bucket = Bucket(name=name, authType=None, saslPassword=None,
762                            num_replicas=self.num_replicas,
763                            bucket_size=self.bucket_size,
764                            port=port, master_id=server_id,
765                            eviction_policy='noEviction', lww=self.lww)
766            self.buckets.append(bucket)
767            ephemeral_buckets.append(bucket)
768
769        for task in bucket_tasks:
770            task.result(self.wait_timeout * 10)
771
772        if self.enable_time_sync:
773            self._set_time_sync_on_buckets(
774                ['standard_bucket' + str(i) for i in range(
775                    num_ephemeral_bucket)])
776        load_gen = BlobGenerator('upgrade', 'upgrade-', self.value_size,
777                                 end=self.num_items)
778        for bucket in ephemeral_buckets:
779            self._load_bucket(bucket, self.master, load_gen, "create",
780                              self.expire_time)
781
782    def _return_maps(self):
783        index_map = self.get_index_map()
784        stats_map = self.get_index_stats(perNode=False)
785        return index_map, stats_map
786
787    def create_fts_index(self):
788        try:
789            self.log.info("Checking if index already exists ...")
790            name = "default"
791            """ test on one bucket """
792            for bucket in self.buckets:
793                name = bucket.name
794                break
795            SOURCE_CB_PARAMS = {
796                      "authUser": "default",
797                      "authPassword": "",
798                      "authSaslUser": "",
799                      "authSaslPassword": "",
800                      "clusterManagerBackoffFactor": 0,
801                      "clusterManagerSleepInitMS": 0,
802                      "clusterManagerSleepMaxMS": 20000,
803                      "dataManagerBackoffFactor": 0,
804                      "dataManagerSleepInitMS": 0,
805                      "dataManagerSleepMaxMS": 20000,
806                      "feedBufferSizeBytes": 0,
807                      "feedBufferAckThreshold": 0
808                       }
809            self.index_type = 'fulltext-index'
810            self.index_definition = {
811                          "type": "fulltext-index",
812                          "name": "",
813                          "uuid": "",
814                          "params": {},
815                          "sourceType": "couchbase",
816                          "sourceName": "",
817                          "sourceUUID": "",
818                          "sourceParams": SOURCE_CB_PARAMS,
819                          "planParams": {}
820                          }
821            self.name = self.index_definition['name'] = \
822                                self.index_definition['sourceName'] = name
823            fts_node = self.get_nodes_from_services_map("fts", \
824                                servers=self.get_nodes_in_cluster_after_upgrade())
825            if fts_node:
826                rest = RestConnection(fts_node)
827                status, _ = rest.get_fts_index_definition(self.name)
828                if status != 400:
829                    rest.delete_fts_index(self.name)
830                self.log.info("Creating {0} {1} on {2}".format(self.index_type,
831                                                           self.name, rest.ip))
832                rest.create_fts_index(self.name, self.index_definition)
833            else:
834                raise("No FTS node in cluster")
835            self.ops_dist_map = self.calculate_data_change_distribution(
836                create_per=self.create_ops_per , update_per=self.update_ops_per ,
837                delete_per=self.delete_ops_per, expiry_per=self.expiry_ops_per,
838                start=0, end=self.docs_per_day)
839            self.log.info(self.ops_dist_map)
840            self.dataset = "simple"
841            self.docs_gen_map = self.generate_ops_docs(self.docs_per_day, 0)
842            self.async_ops_all_buckets(self.docs_gen_map, batch_size=100)
843        except Exception, ex:
844            self.log.info(ex)
845
846    def get_nodes_in_cluster_after_upgrade(self, master_node=None):
847        rest = None
848        if master_node == None:
849            rest = RestConnection(self.master)
850        else:
851            rest = RestConnection(master_node)
852        nodes = rest.node_statuses()
853        server_set = []
854        for node in nodes:
855            for server in self.input.servers:
856                if server.ip == node.ip:
857                    server_set.append(server)
858        return server_set
859
860
861    def create_eventing_services(self):
862        """ Only work after cluster upgrade to 5.5.0 completely """
863        try:
864            rest = RestConnection(self.master)
865            cb_version = rest.get_nodes_version()
866            if 5.5 > float(cb_version[:3]):
867                self.log.info("This eventing test is only for cb version 5.5 and later.")
868                return
869
870            bucket_params = self._create_bucket_params(server=self.master, size=128,
871                                                       replicas=self.num_replicas)
872            self.cluster.create_standard_bucket(name=self.src_bucket_name, port=STANDARD_BUCKET_PORT + 1,
873                                                bucket_params=bucket_params)
874            self.buckets = RestConnection(self.master).get_buckets()
875            self.src_bucket = RestConnection(self.master).get_buckets()
876            self.cluster.create_standard_bucket(name=self.dst_bucket_name, port=STANDARD_BUCKET_PORT + 1,
877                                                bucket_params=bucket_params)
878            self.cluster.create_standard_bucket(name=self.metadata_bucket_name, port=STANDARD_BUCKET_PORT + 1,
879                                                bucket_params=bucket_params)
880            self.buckets = RestConnection(self.master).get_buckets()
881            self.gens_load = self.generate_docs(self.docs_per_day)
882            self.expiry = 3
883
884            self.restServer = self.get_nodes_from_services_map(service_type="eventing")
885            """ must be self.rest to pass in deploy_function"""
886            self.rest = RestConnection(self.restServer)
887
888            self.load(self.gens_load, buckets=self.buckets, flag=self.item_flag, verify_data=False,
889                  batch_size=self.batch_size)
890            function_name = "Function_{0}_{1}".format(randint(1, 1000000000), self._testMethodName)
891            self.function_name = function_name[0:90]
892            body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_ON_UPDATE, worker_count=3)
893            bk_events_created = False
894            rs_events_created = False
895            try:
896                self.deploy_function(body)
897                bk_events_created = True
898                self.verify_eventing_results(self.function_name, self.docs_per_day * 2016, skip_stats_validation=True)
899            except Exception as e:
900                self.log.error(e)
901            finally:
902                self.undeploy_and_delete_function(body)
903        except Exception, e:
904            self.log.info(e)
905
906    def generate_docs_simple(self, num_items, start=0):
907        from couchbase_helper.tuq_generators import JsonGenerator
908        json_generator = JsonGenerator()
909        return json_generator.generate_docs_simple(start=start, docs_per_day=self.docs_per_day)
910
911    def generate_docs(self, num_items, start=0):
912        try:
913            if self.dataset == "simple":
914                return self.generate_docs_simple(num_items, start)
915            if self.dataset == "array":
916                return self.generate_docs_array(num_items, start)
917            return getattr(self, 'generate_docs_' + self.dataset)(num_items, start)
918        except Exception, ex:
919            log.info(str(ex))
920            self.fail("There is no dataset %s, please enter a valid one" % self.dataset)
921
922    def create_save_function_body_test(self, appname, appcode, description="Sample Description",
923                                  checkpoint_interval=10000, cleanup_timers=False,
924                                  dcp_stream_boundary="everything", deployment_status=True,
925                                  skip_timer_threshold=86400,
926                                  sock_batch_size=1, tick_duration=60000, timer_processing_tick_interval=500,
927                                  timer_worker_pool_size=3, worker_count=3, processing_status=True,
928                                  cpp_worker_thread_count=1, multi_dst_bucket=False, execution_timeout=3,
929                                  data_chan_size=10000, worker_queue_cap=100000, deadline_timeout=6
930                                  ):
931        body = {}
932        body['appname'] = appname
933        script_dir = os.path.dirname(__file__)
934        abs_file_path = os.path.join(script_dir, appcode)
935        fh = open(abs_file_path, "r")
936        body['appcode'] = fh.read()
937        fh.close()
938        body['depcfg'] = {}
939        body['depcfg']['buckets'] = []
940        body['depcfg']['buckets'].append({"alias": self.dst_bucket_name, "bucket_name": self.dst_bucket_name})
941        if multi_dst_bucket:
942            body['depcfg']['buckets'].append({"alias": self.dst_bucket_name1, "bucket_name": self.dst_bucket_name1})
943        body['depcfg']['metadata_bucket'] = self.metadata_bucket_name
944        body['depcfg']['source_bucket'] = self.src_bucket_name
945        body['settings'] = {}
946        body['settings']['checkpoint_interval'] = checkpoint_interval
947        body['settings']['cleanup_timers'] = cleanup_timers
948        body['settings']['dcp_stream_boundary'] = dcp_stream_boundary
949        body['settings']['deployment_status'] = deployment_status
950        body['settings']['description'] = description
951        body['settings']['log_level'] = self.eventing_log_level
952        body['settings']['skip_timer_threshold'] = skip_timer_threshold
953        body['settings']['sock_batch_size'] = sock_batch_size
954        body['settings']['tick_duration'] = tick_duration
955        body['settings']['timer_processing_tick_interval'] = timer_processing_tick_interval
956        body['settings']['timer_worker_pool_size'] = timer_worker_pool_size
957        body['settings']['worker_count'] = worker_count
958        body['settings']['processing_status'] = processing_status
959        body['settings']['cpp_worker_thread_count'] = cpp_worker_thread_count
960        body['settings']['execution_timeout'] = execution_timeout
961        body['settings']['data_chan_size'] = data_chan_size
962        body['settings']['worker_queue_cap'] = worker_queue_cap
963        body['settings']['use_memory_manager'] = self.use_memory_manager
964        if execution_timeout != 3:
965            deadline_timeout = execution_timeout + 1
966        body['settings']['deadline_timeout'] = deadline_timeout
967        return body
968
969    def create_fts_index_query_compare(self):
970        """
971        Call before upgrade
972        1. creates a default index, one per bucket
973        2. Loads fts json data
974        3. Runs queries and compares the results against ElasticSearch
975        """
976        self.fts_obj = FTSCallable(nodes=self.servers, es_validate=True)
977        for bucket in self.buckets:
978            self.fts_obj.create_default_index(
979                index_name="index_{0}".format(bucket.name),
980                bucket_name=bucket.name)
981        self.fts_obj.load_data(self.num_items)
982        self.fts_obj.wait_for_indexing_complete()
983        for index in self.fts_obj.fts_indexes:
984            self.fts_obj.run_query_and_compare(index=index, num_queries=20)
985        return self.fts_obj
986
987    def update_delete_fts_data_run_queries(self, fts_obj):
988        """
989        To call after (preferably) upgrade
990        :param fts_obj: the FTS object created in create_fts_index_query_compare()
991        """
992        fts_obj.async_perform_update_delete()
993        for index in fts_obj.fts_indexes:
994            fts_obj.run_query_and_compare(index)
995
996    def delete_all_fts_artifacts(self, fts_obj):
997        """
998        Call during teardown of upgrade test
999        :param fts_obj: he FTS object created in create_fts_index_query_compare()
1000        """
1001        fts_obj.delete_all()
1002
1003    def run_fts_query_and_compare(self):
1004        try:
1005            self.log.info("Verify fts via queries again")
1006            self.update_delete_fts_data_run_queries(self.fts_obj)
1007        except Exception, ex:
1008            print ex
1009
1010    """ for cbas test """
1011    def load_sample_buckets(self, servers=None, bucketName=None,
1012                                  total_items=None, rest=None):
1013        """ Load the specified sample bucket in Couchbase """
1014        self.assertTrue(rest.load_sample(bucketName),
1015                        "Failure while loading sample bucket: {0}".format(bucketName))
1016
1017        """ check for load data into travel-sample bucket """
1018        if total_items:
1019            import time
1020            end_time = time.time() + 180
1021            while time.time() < end_time:
1022                self.sleep(20)
1023                num_actual = 0
1024                if not servers:
1025                    num_actual = self.get_item_count(self.master,bucketName)
1026                else:
1027                    bucket_maps = RestConnection(servers[0]).get_buckets_itemCount()
1028                    num_actual = bucket_maps[bucketName]
1029                if int(num_actual) == total_items:
1030                    self.log.info("{0} items are loaded in the {1} bucket"\
1031                                            .format(num_actual, bucketName))
1032                    break
1033                self.log.info("{0} items are loaded in the {1} bucket"\
1034                                           .format(num_actual, bucketName))
1035            if int(num_actual) != total_items:
1036                return False
1037        else:
1038            self.sleep(120)
1039
1040        return True
1041
1042    def execute_statement_on_cbas_via_rest(self, statement, mode=None, rest=None, timeout=120,\
1043                                             client_context_id=None, username=None, password=None):
1044        """
1045        Executes a statement on CBAS using the REST API using REST Client
1046        """
1047        pretty = "true"
1048        if not rest:
1049            rest = RestConnection(self.cbas_node)
1050        try:
1051            self.log.info("Running query on cbas: {0}".format(statement))
1052            response = rest.execute_statement_on_cbas(statement, mode, pretty,
1053                                                      timeout, client_context_id,
1054                                                      username, password)
1055            response = json.loads(response)
1056            if "errors" in response:
1057                errors = response["errors"]
1058            else:
1059                errors = None
1060
1061            if "results" in response:
1062                results = response["results"]
1063            else:
1064                results = None
1065
1066            if "handle" in response:
1067                handle = response["handle"]
1068            else:
1069                handle = None
1070
1071            return response["status"], response[
1072                "metrics"], errors, results, handle
1073
1074        except Exception,e:
1075            raise Exception(str(e))
1076
1077    def create_bucket_on_cbas(self, cbas_bucket_name, cb_bucket_name,
1078                              cb_server_ip=None,
1079                              validate_error_msg=False,
1080                              username = None, password = None):
1081        """
1082        Creates a bucket on CBAS
1083        """
1084        if cb_server_ip:
1085            cmd_create_bucket = "create bucket " + cbas_bucket_name + \
1086                              " with {\"name\":\"" + cb_bucket_name + \
1087                              "\",\"nodes\":\"" + cb_server_ip + "\"};"
1088        else:
1089            '''DP3 doesn't need to specify cb server ip as cbas node is part of the cluster.'''
1090            cmd_create_bucket = "create bucket " + cbas_bucket_name + \
1091                            " with {\"name\":\"" + cb_bucket_name + "\"};"
1092        status, metrics, errors, results, _ = \
1093                   self.execute_statement_on_cbas_via_rest(cmd_create_bucket,username=username,
1094                                                           password=password)
1095
1096        if validate_error_msg:
1097            return self.validate_error_in_response(status, errors)
1098        else:
1099            if status != "success":
1100                return False
1101            else:
1102                return True
1103
1104    def create_dataset_on_bucket(self, cbas_bucket_name, cbas_dataset_name,
1105                                 where_field=None, where_value = None,
1106                                 validate_error_msg=False, username = None,
1107                                 password = None):
1108        """
1109        Creates a shadow dataset on a CBAS bucket
1110        """
1111        cmd_create_dataset = "create shadow dataset {0} on {1};".format(
1112            cbas_dataset_name, cbas_bucket_name)
1113        if where_field and where_value:
1114            cmd_create_dataset = "create shadow dataset {0} on {1} WHERE `{2}`=\"{3}\";"\
1115                                              .format(cbas_dataset_name, cbas_bucket_name,
1116                                                      where_field, where_value)
1117        status, metrics, errors, results, _ = \
1118                        self.execute_statement_on_cbas_via_rest(cmd_create_dataset,
1119                                                                username=username,
1120                                                                password=password)
1121        if validate_error_msg:
1122            return self.validate_error_in_response(status, errors)
1123        else:
1124            if status != "success":
1125                return False
1126            else:
1127                return True
1128
1129    def test_create_dataset_on_bucket(self):
1130        # Create bucket on CBAS
1131        self.create_bucket_on_cbas(cbas_bucket_name=self.cbas_bucket_name,
1132                                   cb_bucket_name=self.cb_bucket_name,
1133                                   cb_server_ip=self.cb_server_ip)
1134
1135        # Create dataset on the CBAS bucket
1136        result = self.create_dataset_on_bucket(
1137            cbas_bucket_name=self.cbas_bucket_name_invalid,
1138            cbas_dataset_name=self.cbas_dataset_name,
1139            validate_error_msg=self.validate_error)
1140        if not result:
1141            self.fail("FAIL : Actual error msg does not match the expected")
1142
1143    def create_replica_index(self):
1144        self.log.info("create_index")
1145        self.index_list = {}
1146        self._initialize_n1ql_helper()
1147        try:
1148            self.n1ql_helper.create_primary_index(using_gsi = True,
1149                                               server = self.n1ql_server)
1150            self.log.info("done create_index")
1151        except Exception, e:
1152            self.log.info(e)
1153
1154    def create_index_with_replica_and_query(self):
1155        """ ,groups=simple,reset_services=True """
1156        self.log.info("Create index with replica and query")
1157        self.n1ql_node = self.get_nodes_from_services_map(service_type="n1ql")
1158        self._initialize_n1ql_helper()
1159        self.index_name_prefix = "random_index_" + str(randint(100000, 999999))
1160        create_index_query = "CREATE INDEX " + self.index_name_prefix + \
1161              " ON default(age) USING GSI WITH {{'num_replica': {0}}};"\
1162                                        .format(self.num_index_replicas)
1163        try:
1164            self.create_replica_index()
1165            self.n1ql_helper.run_cbq_query(query=create_index_query,
1166                                           server=self.n1ql_node)
1167        except Exception, e:
1168            self.log.info(e)
1169        self.sleep(30)
1170        index_map = self.get_index_map()
1171        self.log.info(index_map)
1172        if not self.expected_err_msg:
1173            self.n1ql_helper.verify_replica_indexes([self.index_name_prefix],
1174                                                    index_map,
1175                                                    self.num_index_replicas)
1176
1177    def verify_index_with_replica_and_query(self):
1178        index_map = self.get_index_map()
1179        try:
1180            self.n1ql_helper.verify_replica_indexes([self.index_name_prefix],
1181                                                    index_map,
1182                                                    self.num_index_replicas)
1183        except Exception, e:
1184            self.log.info(e)
1185
1186    def _initialize_n1ql_helper(self):
1187        if self.n1ql_helper == None:
1188            self.n1ql_server = self.get_nodes_from_services_map(service_type = \
1189                                              "n1ql",servers=self.input.servers)
1190            self.n1ql_helper = N1QLHelper(version = "sherlock", shell = None,
1191                use_rest = True, max_verify = self.max_verify,
1192                buckets = self.buckets, item_flag = None,
1193                n1ql_port = self.n1ql_server.n1ql_port, full_docs_list = [],
1194                log = self.log, input = self.input, master = self.master)