1import copy
2import os, shutil, ast, re, subprocess
3import json
4import urllib
5
6from basetestcase import BaseTestCase
7from couchbase_helper.data_analysis_helper import DataCollector
8from membase.helper.rebalance_helper import RebalanceHelper
9from couchbase_helper.documentgenerator import BlobGenerator,DocumentGenerator
10from ent_backup_restore.validation_helpers.backup_restore_validations \
11                                                 import BackupRestoreValidations
12from membase.helper.bucket_helper import BucketOperationHelper
13from membase.helper.cluster_helper import ClusterOperationHelper
14from remote.remote_util import RemoteMachineShellConnection
15from membase.api.rest_client import RestHelper, Bucket as CBBucket
16from couchbase_helper.document import View
17from testconstants import LINUX_COUCHBASE_BIN_PATH,\
18                          COUCHBASE_DATA_PATH, WIN_COUCHBASE_DATA_PATH_RAW,\
19                          WIN_COUCHBASE_BIN_PATH_RAW, WIN_COUCHBASE_BIN_PATH, WIN_TMP_PATH_RAW,\
20                          MAC_COUCHBASE_BIN_PATH, LINUX_ROOT_PATH, WIN_ROOT_PATH,\
21                          WIN_TMP_PATH, STANDARD_BUCKET_PORT, WIN_CYGWIN_BIN_PATH
22from testconstants import INDEX_QUOTA, FTS_QUOTA
23from membase.api.rest_client import RestConnection
24from security.rbac_base import RbacBase
25from couchbase.bucket import Bucket
26from lib.memcached.helper.data_helper import VBucketAwareMemcached, MemcachedClientHelper
27
28SOURCE_CB_PARAMS = {
29                      "authUser": "default",
30                      "authPassword": "",
31                      "authSaslUser": "",
32                      "authSaslPassword": "",
33                      "clusterManagerBackoffFactor": 0,
34                      "clusterManagerSleepInitMS": 0,
35                      "clusterManagerSleepMaxMS": 20000,
36                      "dataManagerBackoffFactor": 0,
37                      "dataManagerSleepInitMS": 0,
38                      "dataManagerSleepMaxMS": 20000,
39                      "feedBufferSizeBytes": 0,
40                      "feedBufferAckThreshold": 0
41                    }
42INDEX_DEFINITION = {
43                          "type": "fulltext-index",
44                          "name": "",
45                          "uuid": "",
46                          "params": {},
47                          "sourceType": "couchbase",
48                          "sourceName": "default",
49                          "sourceUUID": "",
50                          "sourceParams": SOURCE_CB_PARAMS,
51                          "planParams": {}
52                        }
53
54
55class EnterpriseBackupRestoreBase(BaseTestCase):
56    def setUp(self):
57        super(EnterpriseBackupRestoreBase, self).setUp()
58        """ from version 4.6.0 and later, --host flag is deprecated """
59        self.cluster_flag = "--host"
60        self.backupset = Backupset()
61        self.cmd_ext = ""
62        self.should_fail = self.input.param("should-fail", False)
63        self.database_path = COUCHBASE_DATA_PATH
64
65        cmd =  'curl -g {0}:8091/diag/eval -u {1}:{2}'.format(self.master.ip,
66                                                              self.master.rest_username,
67                                                              self.master.rest_password)
68        cmd += '-d "path_config:component_path(bin)."'
69        bin_path  = subprocess.check_output(cmd, shell=True)
70        if "bin" not in bin_path:
71            self.fail("Check if cb server install on %s" % self.master.ip)
72        else:
73            self.cli_command_location = bin_path.replace('"','') + "/"
74
75        self.debug_logs = self.input.param("debug-logs", False)
76        self.backupset.directory = self.input.param("dir", "/tmp/entbackup")
77        self.backupset.user_env = self.input.param("user-env", False)
78        self.backupset.passwd_env = self.input.param("passwd-env", False)
79        self.backupset.log_archive_env = self.input.param("log-archive-env", False)
80        self.backupset.no_log_output_flag = self.input.param("no-log-output-flag", False)
81        self.backupset.ex_logs_path = self.input.param("ex-logs-path", None)
82        self.backupset.overwrite_user_env = self.input.param("overwrite-user-env", False)
83        self.backupset.overwrite_passwd_env = self.input.param("overwrite-passwd-env", False)
84        self.replace_ttl_with = self.input.param("replace-ttl-with", None)
85        self.bk_with_ttl = self.input.param("bk-with-ttl", None)
86        self.backupset.user_env_with_prompt = \
87                        self.input.param("user-env-with-prompt", False)
88        self.backupset.passwd_env_with_prompt = \
89                        self.input.param("passwd-env-with-prompt", False)
90        shell = RemoteMachineShellConnection(self.servers[0])
91        info = shell.extract_remote_info().type.lower()
92        self.root_path = LINUX_ROOT_PATH
93        self.wget = "wget"
94        self.os_name = "linux"
95        self.tmp_path = "/tmp/"
96        self.long_help_flag = "--help"
97        self.short_help_flag = "-h"
98        self.cygwin_bin_path = ""
99        self.enable_firewal = False
100        self.rfc3339_date = "date +%s --date='{0} seconds' | ".format(self.replace_ttl_with) + \
101                                "xargs -I {} date --date='@{}' --rfc-3339=seconds | "\
102                                "sed 's/ /T/'"
103        self.seconds_with_ttl = "date +%s --date='{0} seconds'".format(self.replace_ttl_with)
104        if info == 'linux':
105            if self.nonroot:
106                base_path = "/home/%s" % self.master.ssh_username
107                self.database_path = "%s%s" % (base_path, COUCHBASE_DATA_PATH)
108                self.root_path = "/home/%s/" % self.master.ssh_username
109        elif info == 'windows':
110            self.os_name = "windows"
111            self.cmd_ext = ".exe"
112            self.wget = "/cygdrive/c/automation/wget.exe"
113            self.database_path = WIN_COUCHBASE_DATA_PATH_RAW
114            self.root_path = WIN_ROOT_PATH
115            self.tmp_path = WIN_TMP_PATH
116            self.long_help_flag = "help"
117            self.short_help_flag = "h"
118            self.cygwin_bin_path = WIN_CYGWIN_BIN_PATH
119            self.rfc3339_date = "date +%s --date='{0} seconds' | ".format(self.replace_ttl_with) + \
120                            "{0}xargs -I {{}} date --date=\"@'{{}}'\" --rfc-3339=seconds | "\
121                                                            .format(self.cygwin_bin_path) + \
122                                                                               "sed 's/ /T/'"
123            win_format = "C:/Program Files"
124            cygwin_format = "/cygdrive/c/Program\ Files"
125            if win_format in self.cli_command_location:
126                self.cli_command_location = self.cli_command_location.replace(win_format,
127                                                                              cygwin_format)
128            self.backupset.directory = self.input.param("dir", WIN_TMP_PATH_RAW + "entbackup")
129        elif info == 'mac':
130            self.backupset.directory = self.input.param("dir", "/tmp/entbackup")
131        else:
132            raise Exception("OS not supported.")
133        self.backup_validation_files_location = "/tmp/backuprestore" + self.master.ip
134        self.backupset.backup_host = self.input.clusters[1][0]
135        self.backupset.name = self.input.param("name", "backup")
136        self.non_master_host = self.input.param("non-master", False)
137        self.compact_backup = self.input.param("compact-backup", False)
138        self.merged = self.input.param("merged", None)
139        self.expire_time = self.input.param('expire_time', 0)
140        self.after_upgrade_merged = self.input.param("after-upgrade-merged", False)
141        self.batch_size = self.input.param("batch_size", 1000)
142        self.create_gsi = self.input.param("create-gsi", False)
143        self.gsi_names = ["num1", "num2"]
144        self.enable_firewall = False
145        self.eventing_log_level = self.input.param('eventing_log_level', 'INFO')
146        self.do_restore = self.input.param("do-restore", False)
147        self.do_verify = self.input.param("do-verify", False)
148        self.create_views = self.input.param("create-views", False)
149        self.create_fts_index = self.input.param("create-fts-index", False)
150        self.cluster_new_user = self.input.param("new_user", None)
151        self.cluster_new_role = self.input.param("new_role", None)
152        self.new_replicas = self.input.param("new-replicas", None)
153        self.restore_only = self.input.param("restore-only", False)
154        self.replace_ttl = self.input.param("replace-ttl", None)
155        self.vbucket_filter = self.input.param("vbucket-filter", None)
156        self.restore_compression_mode = self.input.param("restore-compression-mode", None)
157        self.force_version_upgrade = self.input.param("force-version-upgrade", None)
158        if self.non_master_host:
159            self.backupset.cluster_host = self.servers[1]
160            self.backupset.cluster_host_username = self.servers[1].rest_username
161            self.backupset.cluster_host_password = self.servers[1].rest_password
162        else:
163            self.backupset.cluster_host = self.servers[0]
164            self.backupset.cluster_host_username = self.servers[0].rest_username
165            self.backupset.cluster_host_password = self.servers[0].rest_password
166        self.same_cluster = self.input.param("same-cluster", False)
167        self.reset_restore_cluster = self.input.param("reset-restore-cluster", True)
168        self.no_progress_bar = self.input.param("no-progress-bar", True)
169        self.multi_threads = self.input.param("multi_threads", False)
170        self.threads_count = self.input.param("threads_count", 1)
171        self.bucket_delete = self.input.param("bucket_delete", False)
172        self.bucket_flush = self.input.param("bucket_flush", False)
173        if self.same_cluster:
174            self.backupset.restore_cluster_host = self.servers[0]
175            self.backupset.restore_cluster_host_username = self.servers[0].rest_username
176            self.backupset.restore_cluster_host_password = self.servers[0].rest_password
177        else:
178            self.backupset.restore_cluster_host = self.input.clusters[0][0]
179            self.backupset.restore_cluster_host_username = self.input.clusters[0][0].rest_username
180            self.backupset.restore_cluster_host_password = self.input.clusters[0][0].rest_password
181        """ new user to test RBAC """
182        if self.cluster_new_user:
183            self.backupset.cluster_host_username = self.cluster_new_user
184            self.backupset.restore_cluster_host_username = self.cluster_new_user
185        include_buckets = self.input.param("include-buckets", "")
186        include_buckets = include_buckets.split(",") if include_buckets else []
187        exclude_buckets = self.input.param("exclude-buckets", "")
188        exclude_buckets = exclude_buckets.split(",") if exclude_buckets else []
189        self.backupset.exclude_buckets = exclude_buckets
190        self.backupset.include_buckets = include_buckets
191        self.backupset.disable_bucket_config = self.input.param("disable-bucket-config", False)
192        self.backupset.disable_views = self.input.param("disable-views", False)
193        self.backupset.disable_gsi_indexes = self.input.param("disable-gsi-indexes", False)
194        self.backupset.disable_ft_indexes = self.input.param("disable-ft-indexes", False)
195        self.backupset.disable_data = self.input.param("disable-data", False)
196        self.backupset.disable_conf_res_restriction = self.input.param("disable-conf-res-restriction", None)
197        self.backupset.force_updates = self.input.param("force-updates", False)
198        self.backupset.resume = self.input.param("resume", False)
199        self.backupset.purge = self.input.param("purge", False)
200        self.backupset.threads = self.input.param("threads", self.number_of_processors())
201        self.backupset.start = self.input.param("start", 1)
202        self.backupset.end = self.input.param("stop", 1)
203        self.backupset.number_of_backups = self.input.param("number_of_backups", 1)
204        self.backupset.number_of_backups_after_upgrade = \
205                             self.input.param("number_of_backups_after_upgrade", 0)
206        self.backupset.filter_keys = self.input.param("filter-keys", "")
207        self.backupset.random_keys = self.input.param("random_keys", False)
208        self.backupset.filter_values = self.input.param("filter-values", "")
209        self.backupset.no_ssl_verify = self.input.param("no-ssl-verify", False)
210        self.backupset.secure_conn = self.input.param("secure-conn", False)
211        self.backupset.bk_no_cert = self.input.param("bk-no-cert", False)
212        self.backupset.rt_no_cert = self.input.param("rt-no-cert", False)
213        self.backupset.backup_list_name = self.input.param("list-names", None)
214        self.backupset.backup_incr_backup = self.input.param("incr-backup", None)
215        self.backupset.bucket_backup = self.input.param("bucket-backup", None)
216        self.backupset.backup_to_compact = self.input.param("backup-to-compact", 0)
217        self.backupset.map_buckets = self.input.param("map-buckets", None)
218        self.backupset.delete_old_bucket = self.input.param("delete-old-bucket", False)
219        self.add_node_services = self.input.param("add-node-services", "kv")
220        self.backupset.backup_compressed = \
221                                      self.input.param("backup-conpressed", False)
222        self.backups = []
223        self.validation_helper = BackupRestoreValidations(self.backupset,
224                                                          self.cluster_to_backup,
225                                                          self.cluster_to_restore,
226                                            self.buckets,
227                                            self.backup_validation_files_location,
228                                            self.backups,
229                                            self.num_items,
230                                            self.vbuckets)
231        self.number_of_backups_taken = 0
232        self.vbucket_seqno = []
233        self.expires = self.input.param("expires", 0)
234        self.auto_failover = self.input.param("enable-autofailover", False)
235        self.auto_failover_timeout = self.input.param("autofailover-timeout", 30)
236        self.graceful = self.input.param("graceful",False)
237        self.recoveryType = self.input.param("recoveryType", "full")
238        self.skip_buckets = self.input.param("skip_buckets", False)
239        self.lww_new = self.input.param("lww_new", False)
240        self.skip_consistency = self.input.param("skip_consistency", False)
241        self.master_services = self.get_services([self.backupset.cluster_host],
242                                            self.services_init, start_node=0)
243        if not self.master_services:
244            self.master_services = ["kv"]
245        self.per_node = self.input.param("per_node", True)
246        if not os.path.exists(self.backup_validation_files_location):
247            os.mkdir(self.backup_validation_files_location)
248        shell.disconnect()
249
250    def tearDown(self):
251        super(EnterpriseBackupRestoreBase, self).tearDown()
252        if not self.input.param("skip_cleanup", False):
253            remote_client = RemoteMachineShellConnection(self.input.clusters[1][0])
254            info = remote_client.extract_remote_info().type.lower()
255            self.tmp_path = "/tmp/"
256            if info == 'linux' or info == 'mac':
257                backup_directory = "/tmp/entbackup"
258            elif info == 'windows':
259                self.tmp_path = WIN_TMP_PATH
260                backup_directory = WIN_TMP_PATH_RAW + "entbackup"
261            else:
262                raise Exception("OS not supported.")
263            backup_directory += self.master.ip
264            validation_files_location = "%sbackuprestore" % self.tmp_path + self.master.ip
265            if info == 'linux':
266                command = "rm -rf {0}".format(backup_directory)
267                output, error = remote_client.execute_command(command)
268                remote_client.log_command_output(output, error)
269            elif info == 'windows':
270                remote_client.remove_directory_recursive(backup_directory)
271            if info == 'linux':
272                command = "rm -rf /cbqe3043/entbackup"
273                output, error = remote_client.execute_command(command)
274                remote_client.log_command_output(output, error)
275            if self.input.clusters:
276                for key in self.input.clusters.keys():
277                    servers = self.input.clusters[key]
278                    try:
279                        self.backup_reset_clusters(servers)
280                    except:
281                        self.log.error("was not able to cleanup cluster the first time")
282                        self.backup_reset_clusters(servers)
283            if os.path.exists(validation_files_location):
284                self.log.info("delete dir %s" % validation_files_location)
285                shutil.rmtree(validation_files_location)
286            remote_client.disconnect()
287
288    @property
289    def cluster_to_backup(self):
290        return self.get_nodes_in_cluster(self.backupset.cluster_host)
291
292    @property
293    def cluster_to_restore(self):
294        return self.get_nodes_in_cluster(self.backupset.restore_cluster_host)
295
296    def number_of_processors(self):
297        remote_client = RemoteMachineShellConnection(self.input.clusters[1][0])
298        info = remote_client.extract_remote_info().type.lower()
299        if info == 'linux' or info == 'mac':
300            command = "nproc"
301            output, error = remote_client.execute_command(command)
302            if output:
303                return output[0]
304            else:
305                return error[0]
306        elif info == 'windows':
307            sysinfo = remote_client.get_windows_system_info()
308            numprocs = sysinfo['Processor(s)'].split(' ')
309            return numprocs[0]
310
311    def backup_reset_clusters(self, servers):
312        BucketOperationHelper.delete_all_buckets_or_assert(servers, self)
313        ClusterOperationHelper.cleanup_cluster(servers, master=servers[0])
314        ClusterOperationHelper.wait_for_ns_servers_or_assert(servers, self)
315
316    def store_vbucket_seqno(self):
317        vseqno = self.get_vbucket_seqnos(self.cluster_to_backup,
318                                         self.buckets,
319                                         self.skip_consistency, self.per_node)
320        self.vbucket_seqno.append(vseqno)
321
322    def backup_create(self, del_old_backup=True):
323        args = "config --archive {0} --repo {1}".format(self.backupset.directory, self.backupset.name)
324        if self.backupset.exclude_buckets:
325            args += " --exclude-buckets \"{0}\"".format(",".join(self.backupset.exclude_buckets))
326        if self.backupset.include_buckets:
327            args += " --include-buckets \"{0}\"".format(",".join(self.backupset.include_buckets))
328        if self.backupset.disable_bucket_config:
329            args += " --disable-bucket-config"
330        if self.backupset.disable_views:
331            args += " --disable-views"
332        if self.backupset.disable_gsi_indexes:
333            args += " --disable-gsi-indexes"
334        if self.backupset.disable_ft_indexes:
335            args += " --disable-ft-indexes"
336        if self.backupset.disable_data:
337            args += " --disable-data"
338        remote_client = RemoteMachineShellConnection(self.backupset.backup_host)
339        command = "{0}/cbbackupmgr {1}".format(self.cli_command_location, args)
340        if del_old_backup:
341            self.log.info("Remove any old dir before create new one")
342            remote_client.execute_command("rm -rf %s" % self.backupset.directory)
343        output, error = remote_client.execute_command(command)
344        remote_client.log_command_output(output, error)
345        remote_client.disconnect()
346        return output, error
347
348    def backup_create_validate(self):
349        output, error = self.backup_create()
350        if error or "Backup repository `{0}` created successfully".format(self.backupset.name) not in output[0]:
351            self.fail("Creating backupset failed.")
352        status, msg = self.validation_helper.validate_backup_create()
353        if not status:
354            self.fail(msg)
355        self.log.info(msg)
356
357    def backup_cluster(self, threads_count=1):
358        url_format = ""
359        secure_port = ""
360        if self.backupset.secure_conn:
361            cacert = self.get_cluster_certificate_info(self.backupset.backup_host,
362                                                       self.backupset.cluster_host)
363            secure_port = "1"
364            url_format = "s"
365
366        user_input = "--username %s " % self.backupset.cluster_host_username
367        password_input = "--password %s " % self.backupset.cluster_host_password
368        if self.backupset.user_env and not self.backupset.overwrite_user_env:
369            user_input = ""
370        elif self.backupset.user_env_with_prompt:
371            password_input = "-u "
372
373        if self.backupset.passwd_env and not self.backupset.overwrite_passwd_env:
374            password_input = ""
375        elif self.backupset.passwd_env_with_prompt:
376            password_input = "-p "
377        if "4.6" <= RestConnection(self.backupset.backup_host).get_nodes_version():
378            self.cluster_flag = "--cluster"
379
380        args = "backup --archive {0} --repo {1} {6} http{7}://{2}:{8}{3} "\
381                   "{4} {5}".format(self.backupset.directory, self.backupset.name,
382                   self.backupset.cluster_host.ip,
383                   self.backupset.cluster_host.port,
384                   user_input,
385                   password_input,
386                   self.cluster_flag, url_format,
387                   secure_port)
388        if self.backupset.no_ssl_verify:
389            args += " --no-ssl-verify"
390        if self.backupset.secure_conn:
391            if not self.backupset.bk_no_cert:
392                args += " --cacert %s" % cacert
393        if self.backupset.resume:
394            args += " --resume"
395        if self.backupset.purge:
396            args += " --purge"
397        if self.no_progress_bar:
398            args += " --no-progress-bar"
399        if self.multi_threads:
400            args += " --threads %s " % threads_count
401        if self.backupset.backup_compressed:
402            args += " --value-compression compressed"
403
404        user_env = ""
405        password_env = ""
406        if self.backupset.user_env:
407            self.log.info("set user env to Administrator")
408            user_env = "export CB_USERNAME=Administrator; "
409        if self.backupset.passwd_env:
410            self.log.info("set password env to password")
411            password_env = "export CB_PASSWORD=password; "
412        if self.backupset.user_env_with_prompt:
413            self.log.info("set username env to prompt")
414            user_env = "unset CB_USERNAME; export CB_USERNAME;"
415        if self.backupset.passwd_env_with_prompt:
416            self.log.info("set password env to prompt")
417            password_env = "unset CB_PASSWORD; export CB_PASSWORD;"
418        remote_client = RemoteMachineShellConnection(self.backupset.backup_host)
419        command = "{3} {2} {0}/cbbackupmgr {1}".format(self.cli_command_location, args,
420                                                   password_env, user_env)
421
422        output, error = remote_client.execute_command(command)
423        remote_client.log_command_output(output, error)
424
425        if error or (output and "Backup successfully completed" not in output):
426            return output, error
427        command = "ls -tr {0}/{1} | tail -1".format(self.backupset.directory, self.backupset.name)
428        o, e = remote_client.execute_command(command)
429        if o:
430            self.backups.append(o[0])
431        self.number_of_backups_taken += 1
432        self.log.info("Finished taking backup  with args: {0}".format(args))
433        remote_client.disconnect()
434        return output, error
435
436    def backup_cluster_validate(self, skip_backup=False, repeats=1,
437                                validate_directory_structure=True):
438        if not skip_backup:
439            output, error = self.backup_cluster()
440            if error or "Backup successfully completed" not in output[-1]:
441                self.fail("Taking cluster backup failed.")
442        self.backup_list()
443        if repeats < 2 and validate_directory_structure:
444            status, msg = self.validation_helper.validate_backup()
445            if not status:
446                self.fail(msg)
447            self.log.info(msg)
448        if not self.backupset.deleted_buckets:
449            if not self.backupset.force_updates:
450                self.store_vbucket_seqno()
451            self.validation_helper.store_keys(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
452                                              self.backup_validation_files_location)
453            self.validation_helper.store_latest(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
454                                                self.backup_validation_files_location)
455            self.validation_helper.store_range_json(self.buckets, self.number_of_backups_taken,
456                                                    self.backup_validation_files_location)
457
458    def backup_restore(self):
459        if self.restore_only:
460            if self.create_fts_index:
461                self.backups.append("2017-05-18T13_40_30.842368123-07_00")
462            else:
463                self.backups.append("2017-05-18T11_55_22.009680763-07_00")
464        try:
465            backup_start = self.backups[int(self.backupset.start) - 1]
466        except IndexError:
467            backup_start = "{0}{1}".format(self.backups[-1], self.backupset.start)
468        try:
469            backup_end = self.backups[int(self.backupset.end) - 1]
470        except IndexError:
471            backup_end = "{0}{1}".format(self.backups[-1], self.backupset.end)
472        url_format = ""
473        secure_port = ""
474        if self.backupset.secure_conn:
475            cacert = self.get_cluster_certificate_info(self.backupset.backup_host,
476                                                       self.backupset.restore_cluster_host)
477            url_format = "s"
478            secure_port = "1"
479
480        user_input = "--username %s " % self.backupset.restore_cluster_host_username
481        password_input = "--password %s " % self.backupset.restore_cluster_host_password
482        if self.backupset.user_env and not self.backupset.overwrite_user_env:
483            user_input = ""
484        elif self.backupset.user_env_with_prompt:
485            user_input = "-u "
486        if self.backupset.passwd_env and not self.backupset.overwrite_passwd_env:
487            password_input = ""
488        elif self.backupset.passwd_env_with_prompt:
489            password_input = "-p "
490
491        if "4.6" <= RestConnection(self.backupset.backup_host).get_nodes_version():
492            self.cluster_flag = "--cluster"
493
494        args = "restore --archive {0} --repo {1} {2} http{9}://{3}:{10}{4} "\
495               "{5} {6} --start {7} --end {8}" \
496                               .format(self.backupset.directory,
497                                       self.backupset.name,
498                                       self.cluster_flag,
499                                       self.backupset.restore_cluster_host.ip,
500                                       self.backupset.restore_cluster_host.port,
501                                       user_input,
502                                       password_input,
503                                       backup_start, backup_end, url_format, secure_port)
504        if self.backupset.no_ssl_verify:
505            args += " --no-ssl-verify"
506        if self.backupset.secure_conn:
507            if not self.backupset.rt_no_cert:
508                args += " --cacert %s" % cacert
509        if self.backupset.exclude_buckets:
510            args += " --exclude-buckets {0}".format(self.backupset.exclude_buckets)
511        if self.backupset.include_buckets:
512            args += " --include-buckets {0}".format(self.backupset.include_buckets)
513        if self.backupset.disable_bucket_config:
514            args += " --disable-bucket-config {0}".format(self.backupset.disable_bucket_config)
515        if self.backupset.disable_views:
516            args += " --disable-views {0}".format(self.backupset.disable_views)
517        if self.backupset.disable_gsi_indexes:
518            args += " --disable-gsi-indexes {0}".format(self.backupset.disable_gsi_indexes)
519        if self.backupset.disable_ft_indexes:
520            args += " --disable-ft-indexes {0}".format(self.backupset.disable_ft_indexes)
521        if self.backupset.disable_data:
522            args += " --disable-data {0}".format(self.backupset.disable_data)
523        if self.backupset.disable_conf_res_restriction is not None:
524            args += " --disable-conf-res-restriction {0}".format(
525                self.backupset.disable_conf_res_restriction)
526        filter_chars = {"star": "*", "dot": "."}
527        if self.backupset.filter_keys:
528            for key in filter_chars:
529                if key in self.backupset.filter_keys:
530                    self.backupset.filter_keys = self.backupset.filter_keys.replace(key,
531                                                                      filter_chars[key])
532            args += " --filter-keys '{0}'".format(self.backupset.filter_keys)
533        if self.backupset.filter_values:
534            for key in filter_chars:
535                if key in self.backupset.filter_values:
536                    self.backupset.filter_values = self.backupset.filter_values.replace(key,
537                                                                          filter_chars[key])
538            args += " --filter-values '{0}'".format(self.backupset.filter_values)
539        if self.backupset.force_updates:
540            args += " --force-updates"
541        if self.no_progress_bar:
542            args += " --no-progress-bar"
543        bucket_compression_mode = self.compression_mode
544        if self.restore_compression_mode is not None:
545            bucket_compression_mode = self.restore_compression_mode
546        if not self.skip_buckets:
547            rest_conn = RestConnection(self.backupset.restore_cluster_host)
548            rest_helper = RestHelper(rest_conn)
549            total_mem = rest_conn.get_nodes_self().mcdMemoryReserved
550            ram_size = rest_conn.get_nodes_self().mcdMemoryReserved * 2 / 3
551            has_index_node = False
552            if "index" in self.master_services[0]:
553                has_index_node = True
554                ram_size = int(ram_size) - INDEX_QUOTA
555            if "fts" in self.master_services[0]:
556                ram_size = int(ram_size) - FTS_QUOTA
557            all_buckets = self.total_buckets
558            if len(self.buckets) > 0:
559                all_buckets = len(self.buckets)
560            bucket_size = self._get_bucket_size(ram_size, all_buckets)
561            if self.dgm_run:
562                bucket_size = 256
563
564            count = 0
565            buckets = []
566            replicas = self.num_replicas
567            if self.new_replicas:
568                replicas = self.new_replicas
569            for bucket in self.buckets:
570                bucket_name = bucket.name
571                if not rest_helper.bucket_exists(bucket_name):
572                    if self.backupset.map_buckets is None:
573                        self.log.info("Creating bucket {0} in restore host {1}"
574                                            .format(bucket_name,
575                                            self.backupset.restore_cluster_host.ip))
576                    elif self.backupset.map_buckets:
577                        self.log.info("Create new bucket name to restore to this bucket")
578                        bucket_maps = ""
579                        bucket_name = bucket.name + "_" + str(count)
580                    if self.bucket_type == "ephemeral":
581                        self.eviction_policy = "noEviction"
582                        self.log.info("ephemeral bucket needs to set restore cluster "
583                                      "to memopt for gsi.")
584                        self.test_storage_mode = "memory_optimized"
585                        self._reset_storage_mode(rest_conn, self.test_storage_mode)
586
587                    self.log.info("replica in bucket {0} is {1}".format(bucket.name, replicas))
588                    rest_conn.create_bucket(bucket=bucket_name,
589                                    ramQuotaMB=int(bucket_size) - 1,
590                                    replicaNumber=replicas,
591                                    authType=bucket.authType if bucket.authType else 'none',
592                                    bucketType=self.bucket_type,
593                                    proxyPort=bucket.port,
594                                    evictionPolicy=self.eviction_policy,
595                                    lww=self.lww_new,
596                                    compressionMode=bucket_compression_mode)
597                    bucket_ready = rest_helper.vbucket_map_ready(bucket_name)
598                    if not bucket_ready:
599                        self.fail("Bucket {0} not created after 120 seconds.".format(bucket_name))
600                    if has_index_node:
601                        self.sleep(5, "wait for index service ready")
602                elif self.backupset.map_buckets and self.same_cluster:
603                    bucket_maps = ""
604                    bucket_name = bucket.name + "_" + str(count)
605                    self.log.info("replica in bucket {0} is {1}".format(bucket_name, replicas))
606                    if self.backupset.delete_old_bucket:
607                        BucketOperationHelper.delete_bucket_or_assert(\
608                                       self.backupset.restore_cluster_host, bucket.name, self)
609                    rest_conn.create_bucket(bucket=bucket_name,
610                                    ramQuotaMB=int(bucket_size) - 1,
611                                    replicaNumber=replicas,
612                                    authType=bucket.authType if bucket.authType else 'none',
613                                    bucketType=self.bucket_type,
614                                    proxyPort=bucket.port,
615                                    evictionPolicy=self.eviction_policy,
616                                    lww=self.lww_new,
617                                    compressionMode=bucket_compression_mode)
618                    bucket_ready = rest_helper.vbucket_map_ready(bucket_name)
619                    if not bucket_ready:
620                        self.fail("Bucket {0} not created after 120 seconds.".format(bucket_name))
621                buckets.append("%s=%s" % (bucket.name, bucket_name))
622                count +=1
623            bucket_maps = ",".join(buckets)
624        if self.backupset.map_buckets:
625            args += " --map-buckets %s " % bucket_maps
626        user_env = ""
627        password_env = ""
628        if self.backupset.user_env:
629            self.log.info("set user env to Administrator")
630            user_env = "export CB_USERNAME=Administrator; "
631        if self.backupset.passwd_env:
632            self.log.info("set password env to password")
633            password_env = "export CB_PASSWORD=password; "
634        if self.backupset.user_env_with_prompt:
635            self.log.info("set username env to prompt")
636            user_env = "unset CB_USERNAME; export CB_USERNAME;"
637        if self.backupset.passwd_env_with_prompt:
638            self.log.info("set password env to prompt")
639            password_env = "unset CB_PASSWORD; export CB_PASSWORD;"
640        shell = RemoteMachineShellConnection(self.backupset.backup_host)
641
642        self.ttl_value = ""
643        if self.replace_ttl is not None:
644            if self.replace_ttl == "all" or self.replace_ttl == "expired":
645                if self.replace_ttl_with is None:
646                    self.fail("Need to include param 'replace-ttl-with' value")
647                else:
648                    if self.replace_ttl_with == 0:
649                        self.ttl_value = "0"
650                        args += " --replace-ttl {0} --replace-ttl-with 0"\
651                                                 .format(self.replace_ttl)
652                    else:
653                        ttl_date, _ = shell.execute_command(self.rfc3339_date)
654                        self.seconds_with_ttl, _ = shell.execute_command(self.seconds_with_ttl)
655                        if self.seconds_with_ttl:
656                            self.ttl_value = self.seconds_with_ttl[0]
657                        if ttl_date and ttl_date[0]:
658                            args += " --replace-ttl {0} --replace-ttl-with {1}"\
659                                         .format(self.replace_ttl, ttl_date[0])
660                        elif isinstance(self.replace_ttl_with, str):
661                            args += " --replace-ttl {0} --replace-ttl-with {1}"\
662                                     .format(self.replace_ttl, self.replace_ttl_with)
663            elif self.replace_ttl == "add-none":
664                args += " --replace-ttl none"
665            elif self.replace_ttl == "empty-flag":
666                args += " --replace-ttl-with {0}".format(self.replace_ttl_with)
667            else:
668                args += " --replace-ttl {0}".format(self.replace_ttl)
669        if self.vbucket_filter:
670            if self.vbucket_filter == "empty":
671                args += " --vbucket-filter "
672            elif self.vbucket_filter == "all":
673                all_vbuckets = "0"
674                for x in range (1, 1023):
675                    all_vbuckets += "," + str(x)
676                args += " --vbucket-filter {0}".format(all_vbuckets)
677            else:
678                args += " --vbucket-filter {0}".format(self.vbucket_filter)
679
680        command = "{3} {2} {0}/cbbackupmgr {1}".format(self.cli_command_location, args,
681                                                   password_env, user_env)
682        output, error = shell.execute_command(command)
683        shell.log_command_output(output, error)
684        self._verify_bucket_compression_mode(bucket_compression_mode)
685        errors_check = ["Unable to process value for", "Error restoring cluster",
686                        "Expected argument for option"]
687        if "Error restoring cluster" in output[0] or "Unable to process value" in output[0] \
688            or "Expected argument for option" in output[0]:
689            if not self.should_fail:
690                self.fail("Failed to restore cluster")
691            else:
692                self.log.info("This test is for negative test")
693        res = output
694        res.extend(error)
695        error_str = "Error restoring cluster: Transfer failed. "\
696                    "Check the logs for more information."
697        if error_str in res:
698            command = "cat " + self.backupset.directory + \
699                      "/logs/backup.log | grep '" + error_str + "' -A 10 -B 100"
700            output, error = shell.execute_command(command)
701            shell.log_command_output(output, error)
702        if 'Required Flags:' in res:
703            if not self.should_fail:
704                self.fail("Command line failed. Please check test params.")
705        shell.disconnect()
706        return output, error
707
708    def backup_restore_validate(self, compare_uuid=False,
709                                seqno_compare_function="==",
710                                replicas=False, mode="memory",
711                                expected_error=None):
712        output, error =self.backup_restore()
713        if expected_error:
714            output.extend(error)
715            error_found = False
716            if expected_error:
717                for line in output:
718                    if self.debug_logs:
719                        print "output from cmd: ", line
720                        print "expect error   : ", expected_error
721                    if line.find(expected_error) != -1:
722                        error_found = True
723                        break
724            self.assertTrue(error_found, "Expected error not found: %s" % expected_error)
725            return
726        remote_client = RemoteMachineShellConnection(self.backupset.backup_host)
727        command = "grep 'Transfer plan finished successfully' " + self.backupset.directory + "/logs/backup.log"
728        output, error = remote_client.execute_command(command)
729        remote_client.log_command_output(output, error)
730        if not output:
731            self.fail("Restoring backup failed.")
732        command = "grep 'Transfer failed' " + self.backupset.directory + "/logs/backup.log"
733        output, error = remote_client.execute_command(command)
734        remote_client.log_command_output(output, error)
735        if output:
736            self.fail("Restoring backup failed.")
737
738        self.log.info("Finished restoring backup")
739        self.log.info("Get current vseqno on node %s " % self.cluster_to_restore[0].ip )
740
741        """ Add built-in user cbadminbucket to second cluster """
742        self.add_built_in_server_user(node=self.input.clusters[0][:self.nodes_init][0])
743        current_vseqno = {}
744        if not self.backupset.force_updates:
745            current_vseqno = self.get_vbucket_seqnos(self.cluster_to_restore, self.buckets,
746                                                 self.skip_consistency, self.per_node)
747        self.log.info("*** Start to validate the restore ")
748        if self.replace_ttl_with:
749            if int(self.replace_ttl_with) < 120 and int(self.replace_ttl_with) != 0:
750                self.fail("Need more than 3 minutes to run with verify before expired")
751            if int(self.replace_ttl_with) > 14400 or int(self.replace_ttl_with) == 0:
752                self.log.info("Time is set more than 4 hours.  No need to sleep")
753        if self.vbucket_filter and self.vbucket_filter != "empty":
754            self._validate_restore_vbucket_filter()
755        elif self.replace_ttl == "all" or self.replace_ttl == "expired":
756            self._validate_restore_replace_ttl_with(self.ttl_value)
757        else:
758            status, msg = self.validation_helper.validate_restore(self.backupset.end,
759                                                  self.vbucket_seqno, current_vseqno,
760                                                  compare_uuid=compare_uuid,
761                                                  compare=seqno_compare_function,
762                                                  get_replica=replicas, mode=mode)
763
764            """ limit the length of message printout to 3000 chars """
765            info = str(msg)[:3000] + '..' if len(str(msg)) > 3000 else msg
766            if not status:
767                self.fail(info)
768            self.log.info(info)
769        remote_client.disconnect()
770
771    def backup_list(self):
772        args = "list --archive {0}".format(self.backupset.directory)
773        if self.backupset.backup_list_name:
774            args += " --repo {0}".format(self.backupset.backup_list_name)
775        if self.backupset.backup_incr_backup:
776            args += " --backup {0}".format(self.backupset.backup_incr_backup)
777        if self.backupset.bucket_backup:
778            args += " --bucket {0}".format(self.backupset.bucket_backup)
779        remote_client = RemoteMachineShellConnection(self.backupset.backup_host)
780        command = "{0}/cbbackupmgr {1}".format(self.cli_command_location, args)
781        output, error = remote_client.execute_command(command)
782        remote_client.log_command_output(output, error)
783        remote_client.disconnect()
784        if error:
785            return False, error, "Getting backup list failed."
786        else:
787            return True, output, "Backup list obtained"
788
789    def backup_compact(self):
790        args = "compact --archive {0} --repo {1} --backup {2}".format(self.backupset.directory, self.backupset.name,
791                                                                  self.backups[self.backupset.backup_to_compact])
792        remote_client = RemoteMachineShellConnection(self.backupset.backup_host)
793        command = "{0}/cbbackupmgr {1}".format(self.cli_command_location, args)
794        output, error = remote_client.execute_command(command)
795        remote_client.log_command_output(output, error)
796        if "Compaction succeeded," not in output[0]:
797            return False, output, "Compacting backup failed."
798        else:
799            return True, output, "Compaction of backup success"
800        remote_client.disconnect()
801
802    def backup_remove(self):
803        args = "remove --archive {0} --repo {1}".format(self.backupset.directory, self.backupset.name)
804        remote_client = RemoteMachineShellConnection(self.backupset.backup_host)
805        command = "{0}/cbbackupmgr {1}".format(self.cli_command_location, args)
806        output, error = remote_client.execute_command(command)
807        remote_client.log_command_output(output, error)
808        remote_client.disconnect()
809        self.verify_cluster_stats()
810        if error:
811            return False, error, "Removing backup failed."
812        else:
813            return True, output, "Removing of backup success"
814
815    def backup_list_validate(self):
816        status, output, message = self.backup_list()
817        if not status:
818            self.fail(message)
819        status, message = self.validation_helper.validate_backup_list(output)
820        if not status:
821            self.fail(message)
822        self.log.info(message)
823
824    def backup_compact_validate(self):
825        self.log.info("Listing backup details before compact")
826        status, output_before_compact, message = self.backup_list()
827        if not status:
828            self.fail(message)
829        status, output, message = self.backup_compact()
830        if not status:
831            self.fail(message)
832        self.log.info("Listing backup details after compact")
833        status, output_after_compact, message = self.backup_list()
834        if not status:
835            self.fail(message)
836        status, message = self.validation_helper.validate_compact_lists(output_before_compact, output_after_compact)
837        if not status:
838            self.fail(message)
839        self.log.info(message)
840
841    def backup_compact_deleted_keys_validation(self, delete_keys):
842        self.log.info("Check deleted keys status in file after compact")
843        conn = RemoteMachineShellConnection(self.backupset.backup_host)
844        output, error = conn.execute_command("ls %s/backup/201*/default*/data "\
845                                                     % self.backupset.directory)
846        deleted_key_status = {}
847        if "shard_0.fdb" in output:
848            cmd = "%sforestdb_dump%s --plain-meta --no-body "\
849                  "%s/backup/201*/default*/data/shard_0.fdb | grep -A 6 ent-backup "\
850                                         % (self.cli_command_location, self.cmd_ext,\
851                                         self.backupset.directory)
852            dump_output, error = conn.execute_command(cmd)
853            if dump_output:
854                key_ids = [x.split(":")[1].strip(' ') for x in dump_output[0::8]]
855                miss_keys = [x for x in delete_keys if x not in key_ids]
856                if miss_keys:
857                    raise Exception("Lost some keys %s ", miss_keys)
858                partition_ids =  [x.split(":")[1].strip(' ') for x in dump_output[1::8]]
859                status_ids =     [x.split(" ")[-3].strip(' ') for x in dump_output[6::8]]
860                for idx, key in enumerate(key_ids):
861                    deleted_key_status[key] = \
862                           {"KV store name":partition_ids[idx], "Status":status_ids[idx]}
863                    if status_ids[idx] != "deleted":
864                        raise Exception("key %s status was not deleted. " % key)
865            else:
866                raise Exception("backup compaction failed to keep delete docs in file")
867        else:
868            raise Exception("file shard_0.fdb did not created ")
869        conn.disconnect()
870        return deleted_key_status
871
872    def backup_merge(self):
873        self.log.info("backups before merge: " + str(self.backups))
874        self.log.info("number_of_backups_taken before merge: " \
875                                                   + str(self.number_of_backups_taken))
876        if self.backupset.deleted_backups:
877            self.backupset.end -= len(self.backupset.deleted_backups)
878        try:
879            backup_start = self.backups[int(self.backupset.start) - 1]
880        except IndexError:
881            backup_start = "{0}{1}".format(self.backups[-1], self.backupset.start)
882        try:
883            backup_end = self.backups[int(self.backupset.end) - 1]
884        except IndexError:
885            backup_end = "{0}{1}".format(self.backups[-1], self.backupset.end)
886        args = "merge --archive {0} --repo {1} --start {2} --end {3}"\
887                                      .format(self.backupset.directory,
888                                              self.backupset.name,
889                                              backup_start, backup_end)
890        remote_client = RemoteMachineShellConnection(self.backupset.backup_host)
891        command = "{0}/cbbackupmgr {1}".format(self.cli_command_location, args)
892        output, error = remote_client.execute_command(command)
893        remote_client.log_command_output(output, error)
894        if error:
895            return False, error, "Merging backup failed"
896        elif output and "Merge completed successfully" not in output[0]:
897            return False, output, "Merging backup failed"
898        elif not output:
899            self.log.info("process cbbackupmge may be killed")
900            return False, [] , "cbbackupmgr may be killed"
901        del self.backups[self.backupset.start - 1:self.backupset.end]
902        command = "ls -tr {0}/{1} | tail -1".format(self.backupset.directory,
903                                                    self.backupset.name)
904        o, e = remote_client.execute_command(command)
905        if o:
906            self.backups.insert(self.backupset.start - 1, o[0])
907        self.number_of_backups_taken -= (self.backupset.end - self.backupset.start + 1)
908        self.number_of_backups_taken += 1
909        self.log.info("backups after merge: " + str(self.backups))
910        self.log.info("number_of_backups_taken after merge: "
911                                                   + str(self.number_of_backups_taken))
912        remote_client.disconnect()
913        return True, output, "Merging backup succeeded"
914
915    def backup_merge_validate(self, repeats=1, skip_validation=False):
916        status, output, message = self.backup_merge()
917        if not status:
918            self.fail(message)
919
920        if repeats < 2 and not skip_validation:
921            self.validation_helper.store_keys(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
922                                              self.backup_validation_files_location)
923            self.validation_helper.store_latest(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
924                                                self.backup_validation_files_location)
925            self.validation_helper.store_range_json(self.buckets, self.number_of_backups_taken,
926                                                    self.backup_validation_files_location, merge=True)
927
928            self.validation_helper.validate_merge(self.backup_validation_files_location)
929
930    def validate_backup_data(self, server_host, server_bucket, master_key,
931                                   perNode, getReplica, mode, items, key_check,
932                                   validate_keys=False,
933                                   regex_pattern=None):
934        """
935            Compare data in backup file with data in bucket
936        """
937        data_matched = True
938        data_collector = DataCollector()
939        bk_file_data, _ = data_collector.get_kv_dump_from_backup_file(server_host,
940                                      self.cli_command_location, self.cmd_ext,
941                                      self.backupset.directory, master_key,
942                                      self.buckets)
943        restore_file_data = bk_file_data
944        regex_backup_data = {}
945        if regex_pattern is not None:
946            pattern = re.compile("%s" % regex_pattern)
947            for bucket in self.buckets:
948                regex_backup_data[bucket.name] = {}
949                self.log.info("Extract keys with regex pattern '%s' either in key or body"
950                                                                          % regex_pattern)
951                for key in restore_file_data[bucket.name]:
952                    if self.debug_logs: print "key in backup file of bucket %s:  %s" \
953                                                               % (bucket.name, key)
954                    if validate_keys:
955                        if pattern.search(key):
956                            regex_backup_data[bucket.name][key] = \
957                                     restore_file_data[bucket.name][key]
958                        if self.debug_logs:
959                            print "\nKeys in backup file of bucket %s that matches "\
960                                        "pattern '%s'" % (bucket.name, regex_pattern)
961                            for x in regex_backup_data[bucket.name]:
962                                print x
963                    else:
964                        if self.debug_logs:
965                            print "value of key in backup file  ",\
966                                                      restore_file_data[bucket.name][key]
967                        if pattern.search(restore_file_data[bucket.name][key]["Value"]):
968                            regex_backup_data[bucket.name][key] = \
969                                         restore_file_data[bucket.name][key]
970                        if self.debug_logs:
971                            print "\nKeys and value in backup file of bucket %s "\
972                                  " that matches pattern '%s'" \
973                                    % (bucket.name, regex_pattern)
974                            for x in regex_backup_data[bucket.name]:
975                                print "key: ", x
976                                print "value: ", regex_backup_data[bucket.name][x]["Value"]
977                restore_file_data = regex_backup_data
978
979        buckets_data = {}
980        for bucket in self.buckets:
981            headerInfo, bucket_data = data_collector.collect_data(server_bucket, [bucket],
982                                                              perNode=False,
983                                                              getReplica=getReplica,
984                                                              mode=mode)
985            buckets_data[bucket.name] = bucket_data[bucket.name]
986            to_element = 5
987            if self.backupset.filter_values:
988                to_element = 7
989            for key in buckets_data[bucket.name]:
990                value = buckets_data[bucket.name][key]
991                if self.backupset.random_keys:
992                    value = ",".join(value.split(',')[4:to_element])
993                    value = value.replace('""', '"')
994                    if value.startswith('"') and value.endswith('"'):
995                        value = value[1:-1]
996                else:
997                    value = ",".join(value.split(',')[4:5])
998                buckets_data[bucket.name][key] = value
999            self.log.info("*** Compare data in bucket and in backup file of bucket %s ***"
1000                                                                            % bucket.name)
1001            failed_persisted_bucket = []
1002            ready = RebalanceHelper.wait_for_stats_on_all(self.backupset.cluster_host,
1003                                                          bucket.name, 'ep_queue_size',
1004                                                          0, timeout_in_seconds=120)
1005            if not ready:
1006                failed_persisted_bucket.append(bucket.name)
1007            if failed_persisted_bucket:
1008                self.fail("Buckets %s did not persisted." % failed_persisted_bucket)
1009            count = 0
1010            key_count = 0
1011            for key in buckets_data[bucket.name]:
1012                if restore_file_data[bucket.name]:
1013                    if buckets_data[bucket.name][key] != restore_file_data[bucket.name][key]["Value"]:
1014                        if count < 20:
1015                            self.log.error("Data does not match at key %s. bucket: %s != %s file"
1016                                           % (key, buckets_data[bucket.name][key],
1017                                              restore_file_data[bucket.name][key]["Value"]))
1018                            data_matched = False
1019                            count += 1
1020                        else:
1021                            raise Exception ("Data not match in backup bucket %s" % bucket.name)
1022                    key_count += 1
1023                else:
1024                    raise Exception("Database file is empty")
1025            if len(restore_file_data[bucket.name]) != key_count:
1026                raise Exception ("Total key counts do not match.  Backup %s != %s bucket"
1027                                  % (restore_file_data[bucket.name], key_count))
1028            self.log.info("******** Data macth in backup file and bucket %s ******** "
1029                                                                        % bucket.name)
1030            print "Bucket: ", bucket.name
1031            print "Total items in backup file:   ", len(bk_file_data[bucket.name])
1032            if regex_pattern is not None:
1033                print "Total items to be restored with regex pattern '%s' is %s "\
1034                                         % (regex_pattern,len(restore_file_data[bucket.name]))
1035            print "Total items in bucket should be:   ", key_count
1036            rest = RestConnection(server_bucket[0])
1037            actual_keys = rest.get_active_key_count(bucket.name)
1038            print "Total items actual in bucket:      ", actual_keys
1039            if actual_keys != key_count:
1040                self.fail("Total keys matched: %s != Total keys in bucket %s: %s" \
1041                          "at node %s " % (key_count, bucket.name, actual_keys,
1042                                           server_bucket[0]))
1043            if self.merged:
1044                if key_check:
1045                    self.log.info("Check if deleted keys still in backup after merged" )
1046                    for key in restore_file_data[bucket.name]:
1047                        if key == key_check:
1048                            raise Exception ("There is an old key after delete bucket,"
1049                                         " backup and merged ")
1050                    self.log.info("No deleted keys in backup after merged")
1051        return data_matched
1052
1053    def get_info_in_database(self, server, bucket, text_search):
1054        """
1055            Get info from database file like couch.1
1056        """
1057        shell = RemoteMachineShellConnection(server)
1058        cmd = "%s/couch_dbdump " % self.cli_command_location
1059        """ since there is no load, it should have only one file per vbucket """
1060        output, error = shell.execute_command("%s %s/%s/0.couch.* | grep deleted"
1061                                               % (cmd, self.database_path,
1062                                                  bucket.name))
1063        found = False
1064        if output:
1065            self.log.info("Search for '%s' in database info" % text_search)
1066            for x in output:
1067                if text_search in x:
1068                    self.log.info("*** Found %s in database %s" % (text_search, x))
1069                    found = True
1070                    break
1071        else:
1072            self.log.info("output is empty")
1073        shell.disconnect()
1074        return found
1075
1076    def validate_help_content(self, output, source):
1077        """
1078            Compare content of of the list with sample output
1079        """
1080        for x in range(0, len(source)):
1081            output[x] = output[x].strip()
1082            output[x] = " ".join(output[x].split())
1083
1084            source[x] = source[x].strip()
1085            source[x] = " ".join(source[x].split())
1086            if not isinstance(output[x], str):
1087                continue
1088            if self.debug_logs:
1089                print "\noutput:  ", repr(output[x])
1090                print "source:  ", repr(source[x])
1091            if output[x] != source[x]:
1092                self.log.error("Element %s in output did not match " % x)
1093                self.log.error("Output => %s != %s <= Source" % (output[x], source[x]))
1094                raise Exception("Content does not match "
1095                                "Output => %s != %s <= Source" % (output[x], source[x]))
1096
1097    def get_cluster_certificate_info(self, server_host, server_cert):
1098        """
1099            This will get certificate info from cluster
1100        """
1101        cert_file_location = self.root_path + "cert.pem"
1102        if self.os_name == "windows":
1103            cert_file_location = WIN_TMP_PATH_RAW + "cert.pem"
1104        shell = RemoteMachineShellConnection(server_host)
1105        cmd = "%s/couchbase-cli ssl-manage -c %s:8091 -u Administrator -p password "\
1106              " --cluster-cert-info > %s" % (self.cli_command_location,
1107                                                     server_cert.ip,
1108                                                     cert_file_location)
1109        output, _ = shell.execute_command(cmd)
1110        if output and "Error" in output[0]:
1111            self.fail("Failed to get CA certificate from cluster.")
1112        shell.disconnect()
1113        return cert_file_location
1114
1115    def _create_views(self):
1116        default_map_func = "function (doc) {\n  emit(doc._id, doc);\n}"
1117        default_view_name = "test"
1118        default_ddoc_name = "ddoc_test"
1119        prefix = "dev_"
1120        query = {"full_set": "true", "stale": "false", "connection_timeout": 60000}
1121        view = View(default_view_name, default_map_func)
1122        task = self.cluster.async_create_view(self.backupset.cluster_host,
1123                                              default_ddoc_name, view, "default")
1124        task.result()
1125
1126    def validate_backup_views(self, server_host):
1127        """
1128            Verify view is backup
1129        """
1130        data_collector = DataCollector()
1131        bk_views_def = data_collector.get_views_definition_from_backup_file(server_host,
1132                                                               self.backupset.directory,
1133                                                               self.buckets)
1134        def_check = ['"id": "_design/dev_ddoc_test"',
1135            '"json": { "views": { "test": { "map": "function (doc) {\\n  emit(doc._id, doc);\\n}" } } }']
1136        if bk_views_def:
1137            self.log.info("Validate views function")
1138            for x in def_check:
1139                if x not in bk_views_def:
1140                    return False, "Missing %s in views definition" % x
1141            return True, "Views function validated"
1142
1143    def get_database_file_info(self):
1144        """
1145           Extract database file size and items from backup repo
1146           :return: Datebase file information
1147        """
1148        status, output, message = self.backup_list()
1149        file_info = {}
1150        unit_size = ["MB", "GB"]
1151        if status:
1152            if output:
1153                for x in output:
1154                    if "shard_0.fdb" in x:
1155                        if x.strip().split()[0][-2:] in unit_size:
1156                            file_info["file_size"] = \
1157                                      int(x.strip().split()[0][:-2].split(".")[0])
1158
1159                            file_info["items"] = int(x.strip().split()[1])
1160                        print "output content   ", file_info
1161            return file_info
1162        else:
1163            print message
1164
1165    def validate_backup_compressed_file(self, no_compression, with_compression):
1166        """
1167            Compare before and after using flag --value-compression compressed
1168        :return: None
1169        """
1170        compressed = False
1171        if no_compression["items"] == with_compression["items"]:
1172            if no_compression["file_size"] > with_compression["file_size"]:
1173                compressed = True
1174                self.log.info("no compressed: %d, with compressed: %d"
1175                                      % (no_compression["file_size"],
1176                                         with_compression["file_size"]))
1177            else:
1178                self.log.info("no compressed: %d, with compressed: %d"
1179                              % (no_compression["file_size"],
1180                                 with_compression["file_size"]))
1181                self.fail("cbbackupmgr failed to compress database")
1182
1183        else:
1184            self.fail("Item miss match on 2 backup files")
1185
1186    def create_indexes(self):
1187        gsi_type = "memory_optimized"
1188        rest = RestConnection(self.backupset.cluster_host)
1189        username = self.master.rest_username
1190        password = self.master.rest_password
1191        if "5" <= rest.get_nodes_version()[:1]:
1192            gsi_type = "plasma"
1193        gsi_type += " -auth {0}:{1} ".format(username, password)
1194        cmd = "cbindex -type create -bucket default -using {0} -index ".format(gsi_type)
1195        cmd += "{0} -fields=Num1".format(self.gsi_names[0])
1196        shell = RemoteMachineShellConnection(self.backupset.cluster_host)
1197        command = "{0}/{1}".format(self.cli_command_location, cmd)
1198        self.log.info("Create gsi indexes")
1199        output, error = shell.execute_command(command)
1200        self.sleep(5)
1201
1202        if self.debug_logs:
1203            self.log.info("\noutput gsi: {0}".format(output))
1204        cmd = "cbindex -type create -bucket default -using {0} -index ".format(gsi_type)
1205        cmd += "{0} -fields=Num2".format(self.gsi_names[1])
1206        command = "{0}/{1}".format(self.cli_command_location, cmd)
1207        shell.execute_command(command)
1208        self.sleep(5)
1209        shell.disconnect()
1210
1211    def verify_gsi(self):
1212        if not self.create_gsi:
1213            self.log.info("No GSI created.  Skip verify")
1214            return
1215        rest = RestConnection(self.backupset.cluster_host)
1216        buckets = rest.get_buckets()
1217        if "5" <= rest.get_nodes_version()[:1]:
1218            self.add_built_in_server_user(node=self.backupset.cluster_host)
1219        cmd = "cbindex -type list"
1220        if "5" <= rest.get_nodes_version()[:1]:
1221            cmd += " -auth %s:%s" % (self.backupset.cluster_host.rest_username,
1222                                     self.backupset.cluster_host.rest_password)
1223        shell = RemoteMachineShellConnection(self.backupset.restore_cluster_host)
1224        command = "{0}/{1}".format(self.cli_command_location, cmd)
1225        output, error = shell.execute_command(command)
1226        shell.log_command_output(output, error)
1227
1228        for bucket in buckets:
1229            index_found = 0
1230            if len(output) > 1:
1231                for name in self.gsi_names:
1232                    mesg = "GSI index {0} created in restore cluster as expected".format(name)
1233                    for x in output:
1234                        if "Index:{0}/{1}".format(bucket.name, name) in x:
1235                            index_found += 1
1236                            self.log.info(mesg)
1237            if index_found < len(self.gsi_names):
1238                self.fail("GSI indexes are not restored in bucket {0}".format(bucket.name))
1239        shell.disconnect()
1240
1241    def _check_output(self, word_check, output):
1242        found = False
1243        if len(output) >=1 :
1244            for x in output:
1245                if word_check.lower() in x.lower():
1246                    self.log.info("Found \"%s\" in CLI output" % word_check)
1247                    found = True
1248                    break
1249        return found
1250
1251    def _reset_storage_mode(self, rest, storageMode):
1252        nodes_in_cluster = rest.get_nodes()
1253        for node in nodes_in_cluster:
1254            RestConnection(node).force_eject_node()
1255        rest.set_indexer_storage_mode(username='Administrator',
1256                                          password='password',
1257                                          storageMode=storageMode)
1258        rest.init_node()
1259
1260    def _collect_logs(self):
1261        """
1262           CB_ARCHIVE_PATH env: param log-archive-env=False
1263           syntax: cbbackupmgr collect-logs -a /tmp/backup -o /tmp/envlogs
1264        """
1265        shell = RemoteMachineShellConnection(self.backupset.backup_host)
1266        info = shell.extract_remote_info().type.lower()
1267
1268        args = " collect-logs -a {0}".format(self.backupset.directory)
1269        if self.backupset.ex_logs_path:
1270            if info == 'windows':
1271                if "tmp" in self.backupset.ex_logs_path:
1272                    self.fail("It must use other name for ex logs dir in windows")
1273                self.backupset.ex_logs_path = "/cygdrive/c/" + \
1274                                              self.backupset.ex_logs_path
1275                if "relativepath" in self.backupset.ex_logs_path:
1276                    self.backupset.ex_logs_path = \
1277                        self.backupset.ex_logs_path.replace("/cygdrive/c/", "~/")
1278            else:
1279                self.backupset.ex_logs_path = "/tmp/" + self.backupset.ex_logs_path
1280                if "relativepath" in self.backupset.ex_logs_path:
1281                    self.backupset.ex_logs_path = \
1282                        self.backupset.ex_logs_path.replace("/tmp/", "~/")
1283            self.log.info("remove any old ex logs directory in {0}"
1284                                            .format(self.backupset.ex_logs_path))
1285            shell.execute_command("rm -rf {0}".format(self.backupset.ex_logs_path))
1286            if self.backupset.ex_logs_path == "non-exist-dir":
1287                self.log.info("test on non exist directory.")
1288            else:
1289                self.log.info("Create logs dir at {0}".format(self.backupset.ex_logs_path))
1290                shell.execute_command("mkdir {0}".format(self.backupset.ex_logs_path))
1291            ex_logs_path = self.backupset.ex_logs_path
1292            if info == 'windows':
1293                ex_logs_path = ex_logs_path.replace("/cygdrive/c", "c:")
1294            args += " -o {0}".format(ex_logs_path)
1295        log_archive_env = ""
1296        args_env = ""
1297        if self.backupset.log_archive_env:
1298            self.log.info("set log arvhive env to {0}".format(self.backupset.directory))
1299            log_archive_env = "unset CB_ARCHIVE_PATH; export CB_ARCHIVE_PATH={0}; "\
1300                                                      .format(self.backupset.directory)
1301            if self.backupset.ex_logs_path:
1302                self.log.info("overwrite env log path with flag -o")
1303                args_env = " -o {0}".format(ex_logs_path)
1304            command = "{0} {1}/cbbackupmgr collect-logs {2}"\
1305                                            .format(log_archive_env,
1306                                                    self.cli_command_location,
1307                                                    args_env)
1308        else:
1309            if "-o" in args and self.backupset.no_log_output_flag:
1310                args = args.replace("-o", " ")
1311            command = "{0} {1}/cbbackupmgr {2}"\
1312                                            .format(log_archive_env,
1313                                                    self.cli_command_location,
1314                                                    args)
1315        output, error = shell.execute_command(command)
1316        shell.disconnect()
1317        if self._check_output("Collecting logs succeeded", output):
1318            self._verify_cbbackupmgr_logs()
1319        elif self.backupset.no_log_output_flag and self._check_output("error", output):
1320            self.log.info("This is negative test")
1321        elif self.backupset.ex_logs_path:
1322            if "non-exist_dir" in self.backupset.ex_logs_path:
1323                self.log.info("This is negative test on non exist output logs dir")
1324            else:
1325                self.fail("Failed to collect logs")
1326        else:
1327            self.fail("Failed to collect logs. Output: {0}".format(output))
1328
1329    def _verify_cbbackupmgr_logs(self):
1330        shell = RemoteMachineShellConnection(self.backupset.backup_host)
1331        self.log.info("\n**** start to verify cbbackupmgr logs ****")
1332
1333        if not self.backupset.ex_logs_path:
1334            logs_path = self.backupset.directory + "/logs"
1335        elif self.backupset.ex_logs_path:
1336            logs_path = self.backupset.ex_logs_path
1337        if self.backupset.log_archive_env:
1338            logs_path = self.backupset.directory
1339            if self.backupset.ex_logs_path:
1340                logs_path = self.backupset.ex_logs_path
1341        output, _ = shell.execute_command("ls {0}".format(logs_path))
1342        if self.debug_logs:
1343            print "\nlog path: ", logs_path
1344            print "output : ", output
1345        if output:
1346            for x in output:
1347                if "collectinfo" in x:
1348                    shell.execute_command("cd {0}; unzip {1}"
1349                                             .format(logs_path, x))
1350        else:
1351            if self.backupset.log_archive_env:
1352                self.fail("Failed to pass CB_ARCHIVE_PATH")
1353            if self.backupset.ex_logs_path:
1354                self.fail("Failed to ")
1355        output, _ = shell.execute_command("ls {0}".format(logs_path))
1356        dir_list =  ["backup", "logs"]
1357        for ele in dir_list:
1358            if ele not in output:
1359                self.fail("Missing dir/file {0} in cbbackupmgr logs".format(ele))
1360
1361        output, _ = shell.execute_command("ls {0}".format(logs_path + "/backup"))
1362        if output and "backup-meta.json" not in output[0]:
1363            self.fail("Missing file 'backup-meta.json' in backup dir")
1364        output, _ = shell.execute_command("ls {0}".format(logs_path + "/logs"))
1365        if output and "backup.log" not in output[0]:
1366            self.fail("Missing file 'backup.log' in backup logs dir")
1367        shell.disconnect()
1368
1369    def _validate_restore_vbucket_filter(self):
1370        data_collector = DataCollector()
1371        vbucket_filter = self.vbucket_filter.split(",")
1372        shell = RemoteMachineShellConnection(self.backupset.backup_host)
1373        rest = RestConnection(self.backupset.restore_cluster_host)
1374        restore_buckets_items = rest.get_buckets_itemCount()
1375        restore_buckets = rest.get_buckets()
1376        for bucket in self.buckets:
1377            output, error = shell.execute_command("ls {0}/backup/*/{1}*/data "\
1378                                             .format(self.backupset.directory, bucket.name))
1379            filter_vbucket_keys = {}
1380            total_filter_keys = 0
1381            """ get vbucket keys pair in data base """
1382            if "shard_0.fdb" in output:
1383                for vb in vbucket_filter:
1384                    cmd = "{0}forestdb_dump{1} --plain-meta --no-body --kvs partition{3} "\
1385                      "{2}/backup/*/*/data/shard_0.fdb | grep 'Doc ID'"\
1386                                         .format(self.cli_command_location, self.cmd_ext,\
1387                                                             self.backupset.directory, vb)
1388                    dump_output, error = shell.execute_command(cmd)
1389                    if dump_output:
1390                        dump_output = [x.replace("Doc ID: ", "") for x in dump_output]
1391
1392                        filter_vbucket_keys[vb] = dump_output
1393                        total_filter_keys += len(dump_output)
1394                        if self.debug_logs:
1395                            print("dump output: ", dump_output)
1396                if filter_vbucket_keys:
1397                    if int(restore_buckets_items[bucket.name]) != total_filter_keys:
1398                        mesg = "** Failed to restore keys from vbucket-filter. "
1399                        mesg += "\nTotal filter keys in backup file {0}".format(total_filter_keys)
1400                        mesg += "\nTotal keys in bucket {0}: {1}".format(bucket.name,
1401                                                                         restore_buckets_items)
1402                        self.fail(mesg)
1403                    else:
1404                        self.log.info("Success restore items from vbucket-filter")
1405                        vbuckets = rest.get_vbuckets(bucket.name)
1406                        headerInfo, bucket_data = \
1407                               data_collector.collect_data([self.backupset.restore_cluster_host],
1408                                                           [bucket], perNode=False,
1409                                                           getReplica=False, mode="memory")
1410                        client = VBucketAwareMemcached(rest, bucket.name)
1411                        self.log.info(" ** vbuckets should be restore: {0}".format(vbucket_filter))
1412                        for key in bucket_data[bucket.name]:
1413                            vBucketId = client._get_vBucket_id(key)
1414                            if self.debug_logs:
1415                                print("This key {0} in vbucket {1}".format(key, vBucketId))
1416                            if str(vBucketId) not in vbucket_filter:
1417                                self.fail("vbucketId {0} of key {1} not from vbucket filters {2}"\
1418                                               .format(vBucketId, key, vbucket_filter))
1419                else:
1420                    self.log.info("No keys with vbucket filter {0} restored".format(vbucket_filter))
1421            else:
1422                raise Exception("file shard_0.fdb did not created ")
1423        shell.disconnect()
1424
1425    def _validate_restore_replace_ttl_with(self, ttl_set):
1426        data_collector = DataCollector()
1427        bk_file_data, _ = data_collector.get_kv_dump_from_backup_file(self.backupset.backup_host,
1428                                      self.cli_command_location, self.cmd_ext,
1429                                      self.backupset.directory, "ent-backup",
1430                                      self.buckets)
1431        shell = RemoteMachineShellConnection(self.backupset.backup_host)
1432        rest = RestConnection(self.backupset.restore_cluster_host)
1433        restore_buckets_items = rest.get_buckets_itemCount()
1434        buckets = rest.get_buckets()
1435        keys_fail = {}
1436        for bucket in buckets:
1437            keys_fail[bucket.name] = {}
1438            if len(bk_file_data[bucket.name].keys()) != \
1439                                    int(restore_buckets_items[bucket.name]):
1440                self.fail("Total keys do not match")
1441            items_info = rest.get_items_info(bk_file_data[bucket.name].keys(),
1442                                                                    bucket.name)
1443            ttl_matched = True
1444            for key in bk_file_data[bucket.name].keys():
1445                if items_info[key]['meta']['expiration'] != int(ttl_set):
1446                    if self.replace_ttl == "all":
1447                        ttl_matched = False
1448                    if self.replace_ttl == "expired" and self.bk_with_ttl is not None:
1449                        ttl_matched = False
1450                    keys_fail[bucket.name][key] = []
1451                    keys_fail[bucket.name][key].append(items_info[key]['meta']['expiration'])
1452                    keys_fail[bucket.name][key].append(int(ttl_set))
1453                    if self.debug_logs:
1454                        print("ttl time set: ", ttl_set)
1455                        print("key {0} failed to set ttl with {1}".format(key,
1456                                   items_info[key]['meta']['expiration']))
1457
1458            if not ttl_matched:
1459                self.fail("ttl value did not set correctly")
1460            else:
1461                self.log.info("all ttl value set matched")
1462            if int(self.replace_ttl_with) == 0:
1463                if len(bk_file_data[bucket.name].keys()) != \
1464                                int(restore_buckets_items[bucket.name]):
1465                    self.fail("Keys do not restore with ttl set to 0")
1466            elif int(self.replace_ttl_with) > 0:
1467                output, _ = shell.execute_command("date +%s")
1468                sleep_time = int(ttl_set) - int(output[0])
1469                if sleep_time > 0:
1470                    if self.replace_ttl == "expired" and self.bk_with_ttl is None:
1471                        self.log.info("No need to wait.  No expired items in backup ")
1472                    elif sleep_time < 600:
1473                        self.sleep(sleep_time + 10, "wait for items to expire")
1474                    else:
1475                        self.log.info("No need to wait.  Expired time set > 5 mins")
1476                else:
1477                    self.log.info("time expired.  No need to wait")
1478
1479                BucketOperationHelper.verify_data(self.backupset.restore_cluster_host,
1480                                                  bk_file_data[bucket.name].keys(),
1481                                                  False, False, self, bucket=bucket.name)
1482                self.sleep(10, "wait for bucket update new stats")
1483                restore_buckets_items = rest.get_buckets_itemCount()
1484                if int(restore_buckets_items[bucket.name]) > 0:
1485                    if self.replace_ttl == "expired" and self.bk_with_ttl is not None:
1486                        if sleep_time < 600:
1487                            self.fail("Key did not expire after wait more than 10 seconds of ttl")
1488        shell.disconnect()
1489
1490    def _verify_bucket_compression_mode(self, restore_bucket_compression_mode):
1491        if self.enable_firewall:
1492            self.sleep(10)
1493        rest = RestConnection(self.backupset.restore_cluster_host)
1494        cb_version = rest.get_nodes_version()
1495        if 5.5 > float(cb_version[:3]):
1496            self.log.info("This version is pre vulcan.  No need to verify compression mode.")
1497            return
1498        buckets = rest.get_buckets()
1499        bucket_compression_mode = []
1500        for bucket in buckets:
1501            compressionMode = rest.get_bucket_compressionMode(bucket=bucket.name)
1502            bucket_compression_mode.append(compressionMode)
1503        for compressionMode in bucket_compression_mode:
1504            if compressionMode != restore_bucket_compression_mode:
1505                self.fail("cbbackupmgr modified bucket pre-config.")
1506
1507    def generate_docs_simple(self, num_items, start=0):
1508        from couchbase_helper.tuq_generators import JsonGenerator
1509        json_generator = JsonGenerator()
1510        return json_generator.generate_docs_simple(start=start, docs_per_day=self.docs_per_day)
1511
1512    def create_save_function_body_test(self, appname, appcode, description="Sample Description",
1513                                  checkpoint_interval=10000, cleanup_timers=False,
1514                                  dcp_stream_boundary="everything", deployment_status=True,
1515                                  skip_timer_threshold=86400,
1516                                  sock_batch_size=1, tick_duration=60000, timer_processing_tick_interval=500,
1517                                  timer_worker_pool_size=3, worker_count=3, processing_status=True,
1518                                  cpp_worker_thread_count=1, multi_dst_bucket=False, execution_timeout=3,
1519                                  data_chan_size=10000, worker_queue_cap=100000, deadline_timeout=6
1520                                  ):
1521        body = {}
1522        body['appname'] = appname
1523        script_dir = os.path.dirname(__file__)
1524        abs_file_path = os.path.join(script_dir, appcode)
1525        fh = open(abs_file_path, "r")
1526        body['appcode'] = fh.read()
1527        fh.close()
1528        body['depcfg'] = {}
1529        body['depcfg']['buckets'] = []
1530        body['depcfg']['buckets'].append({"alias": self.dst_bucket_name, "bucket_name": self.dst_bucket_name})
1531        if multi_dst_bucket:
1532            body['depcfg']['buckets'].append({"alias": self.dst_bucket_name1, "bucket_name": self.dst_bucket_name1})
1533        body['depcfg']['metadata_bucket'] = self.metadata_bucket_name
1534        body['depcfg']['source_bucket'] = self.src_bucket_name
1535        body['settings'] = {}
1536        body['settings']['checkpoint_interval'] = checkpoint_interval
1537        body['settings']['cleanup_timers'] = cleanup_timers
1538        body['settings']['dcp_stream_boundary'] = dcp_stream_boundary
1539        body['settings']['deployment_status'] = deployment_status
1540        body['settings']['description'] = description
1541        body['settings']['log_level'] = self.eventing_log_level
1542        body['settings']['skip_timer_threshold'] = skip_timer_threshold
1543        body['settings']['sock_batch_size'] = sock_batch_size
1544        body['settings']['tick_duration'] = tick_duration
1545        body['settings']['timer_processing_tick_interval'] = timer_processing_tick_interval
1546        body['settings']['timer_worker_pool_size'] = timer_worker_pool_size
1547        body['settings']['worker_count'] = worker_count
1548        body['settings']['processing_status'] = processing_status
1549        body['settings']['cpp_worker_thread_count'] = cpp_worker_thread_count
1550        body['settings']['execution_timeout'] = execution_timeout
1551        body['settings']['data_chan_size'] = data_chan_size
1552        body['settings']['worker_queue_cap'] = worker_queue_cap
1553        body['settings']['use_memory_manager'] = self.use_memory_manager
1554        if execution_timeout != 3:
1555            deadline_timeout = execution_timeout + 1
1556        body['settings']['deadline_timeout'] = deadline_timeout
1557        return body
1558
1559    def _verify_backup_events_definition(self, bk_fxn):
1560        backup_path = self.backupset.directory + "/backup/{0}/".format(self.backups[0])
1561        events_file_name = "events.json"
1562        bk_file_events_dir = "/tmp/backup_events{0}/".format(self.master.ip)
1563        bk_file_events_path = bk_file_events_dir + events_file_name
1564
1565        shell = RemoteMachineShellConnection(self.backupset.backup_host)
1566        self.log.info("create local dir")
1567        if os.path.exists(bk_file_events_dir):
1568            shutil.rmtree(bk_file_events_dir)
1569        os.makedirs(bk_file_events_dir)
1570        self.log.info("copy eventing definition from remote to local")
1571        shell.copy_file_remote_to_local(backup_path+events_file_name,
1572                                        bk_file_events_path)
1573        local_bk_def = open(bk_file_events_path)
1574        bk_file_fxn = json.loads(local_bk_def.read())
1575        for k, v in bk_file_fxn[0]["settings"].iteritems():
1576            if v != bk_fxn[0]["settings"][k]:
1577                self.log.info("key {0} has value not match".format(k))
1578                self.log.info("{0} : {1}".format(v, bk_fxn[0]["settings"][k]))
1579        self.log.info("remove tmp file in slave")
1580        if os.path.exists(bk_file_events_dir):
1581            shutil.rmtree(bk_file_events_dir)
1582        shell.disconnect()
1583
1584    def _verify_restore_events_definition(self, bk_fxn):
1585        backup_path = self.backupset.directory + "/backup/{0}/".format(self.backups[0])
1586        events_file_name = "events.json"
1587        bk_file_events_dir = "/tmp/backup_events{0}/".format(self.master.ip)
1588        bk_file_events_path = bk_file_events_dir + events_file_name
1589        rest = RestConnection(self.backupset.restore_cluster_host)
1590        rs_fxn = rest.get_all_functions()
1591
1592        if self.ordered(rs_fxn) != self.ordered(bk_fxn):
1593            self.fail("Events definition of backup and restore cluster are different")
1594
1595    def ordered(self, obj):
1596        if isinstance(obj, dict):
1597            return sorted((k, ordered(v)) for k, v in obj.items())
1598        if isinstance(obj, list):
1599            return sorted(ordered(x) for x in obj)
1600        else:
1601            return obj
1602
1603
1604class Backupset:
1605    def __init__(self):
1606        self.backup_host = None
1607        self.directory = ''
1608        self.name = ''
1609        self.cluster_host = None
1610        self.cluster_host_username = ''
1611        self.cluster_host_password = ''
1612        self.cluster_new_user = None
1613        self.cluster_new_role = None
1614        self.restore_cluster_host = None
1615        self.restore_cluster_host_username = ''
1616        self.restore_cluster_host_password = ''
1617        self.threads = ''
1618        self.exclude_buckets = []
1619        self.include_buckets = []
1620        self.disable_bucket_config = False
1621        self.disable_views = False
1622        self.disable_gsi_indexes = False
1623        self.disable_ft_indexes = False
1624        self.disable_data = False
1625        self.disable_conf_res_restriction = False
1626        self.force_updates = False
1627        self.resume = False
1628        self.purge = False
1629        self.start = 1
1630        self.end = 1
1631        self.number_of_backups = 1
1632        self.number_of_backups_after_upgrade = 1
1633        self.filter_keys = ''
1634        self.filter_values = ''
1635        self.no_ssl_verify = False
1636        self.random_keys = False
1637        self.secure_conn = False
1638        self.bk_no_cert = False
1639        self.rt_no_cert = False
1640        self.backup_list_name = ''
1641        self.backup_incr_backup = ''
1642        self.bucket_backup = ''
1643        self.backup_to_compact = ''
1644        self.map_buckets = None
1645        self.delete_old_bucket = False
1646        self.backup_compressed = False
1647        self.user_env = False
1648        self.log_archive_env = False
1649        self.no_log_output_flag = False
1650        self.ex_logs_path = None
1651        self.passwd_env = False
1652        self.overwrite_user_env = False
1653        self.overwrite_passwd_env = False
1654        self.user_env_with_prompt = False
1655        self.passwd_env_with_prompt = False
1656        self.deleted_buckets = []
1657        self.new_buckets = []
1658        self.flushed_buckets = []
1659        self.deleted_backups = []
1660
1661
1662class EnterpriseBackupMergeBase(EnterpriseBackupRestoreBase):
1663    def setUp(self):
1664        super(EnterpriseBackupMergeBase, self).setUp()
1665        self.actions = self.input.param("actions", None)
1666        self.document_type = self.input.param("document_type", "json")
1667        if self.document_type == "binary":
1668            self.initial_load_gen = BlobGenerator("ent-backup", "ent-backup-",
1669                                                  self.value_size,
1670                                                  start=0,
1671                                                  end=self.num_items)
1672            self.create_gen = BlobGenerator("ent-backup", "ent-backup-",
1673                                            self.value_size, start=self.num_items,
1674                                            end=self.num_items + self.num_items
1675                                                                 * 0.5)
1676            self.update_gen = BlobGenerator("ent-backup", "ent-backup-",
1677                                            self.value_size,
1678                                            end=self.num_items * 0.9)
1679
1680            self.delete_gen = BlobGenerator("ent-backup", "ent-backup-",
1681                                            self.value_size,
1682                                            start=self.num_items / 10,
1683                                            end=self.num_items)
1684        elif self.document_type == "json":
1685            age = range(5)
1686            first = ['james', 'sharon']
1687            template = '{{ "age": {0}, "first_name": "{1}" }}'
1688            gen = DocumentGenerator('test_docs', template, age, first, start=0,
1689                                    end=5)
1690            self.initial_load_gen = DocumentGenerator('test_docs', template,
1691                                                      age, first, start=0,
1692                                                      end=self.num_items)
1693            self.create_gen = DocumentGenerator('test_docs', template,
1694                                                age, first,
1695                                                start=self.num_items,
1696                                                end=self.num_items +
1697                                                    self.num_items * 0.5)
1698            self.update_gen = DocumentGenerator('test_docs', template,
1699                                                age, first,
1700                                                end=self.num_items * 0.9)
1701
1702            self.delete_gen = DocumentGenerator('test_docs', template,
1703                                                age, first,
1704                                                start=self.num_items / 10,
1705                                                end=self.num_items)
1706        self.new_buckets = 1
1707        self.bucket_to_flush = 1
1708        self.ephemeral = False
1709        self.recreate_bucket = False
1710        self.graceful = True
1711        self.recoveryType = 'full'
1712        self.nodes_in = self.input.param("nodes_in", 0)
1713        self.nodes_out = self.input.param("nodes_out", 0)
1714        self.number_of_repeats = self.input.param("repeats", 1)
1715        self.backup_to_corrupt = 0
1716        self.backup_to_delete = 0
1717        self.skip_restore_indexes = self.input.param("skip_restore_indexe", False)
1718        self.overwrite_indexes = self.input.param("overwrite_indexes", False)
1719        self.skip_validation = self.input.param("skip_validation", False)
1720
1721    def tearDown(self):
1722        super(EnterpriseBackupMergeBase, self).tearDown()
1723
1724    def _get_python_sdk_client(self, ip, bucket, cluster_host):
1725        try:
1726            role_del = [bucket.name]
1727            RbacBase().remove_user_role(role_del, RestConnection(cluster_host))
1728        except Exception, ex:
1729            self.log.info(str(ex))
1730            self.assertTrue(str(ex) == '"User was not found."', str(ex))
1731
1732        testuser = [{'id': bucket.name, 'name': bucket.name, 'password': 'password'}]
1733        RbacBase().create_user_source(testuser, 'builtin', cluster_host)
1734        self.sleep(10)
1735
1736        role_list = [{'id': bucket.name, 'name': bucket.name, 'roles': 'admin'}]
1737        RbacBase().add_user_role(role_list, RestConnection(cluster_host), 'builtin')
1738        self.sleep(10)
1739
1740        try:
1741            cb = Bucket('couchbase://' + ip + '/' + bucket.name, password='password')
1742            if cb is not None:
1743                self.log.info("Established connection to bucket " + bucket.name + " on " + ip + " using python SDK")
1744            else:
1745                self.fail("Failed to connect to bucket " + bucket.name + " on " + ip + " using python SDK")
1746            return cb
1747        except Exception, ex:
1748            self.fail(str(ex))
1749
1750    def async_ops_on_buckets(self):
1751        """
1752        Performs async operations on all the buckets in the cluster.
1753        The operations are: creates, updates and deletes
1754        :return: List of tasks running the load operations.
1755        """
1756        tasks = []
1757        create_tasks = self._async_load_all_buckets(self.master,
1758                                                    self.create_gen,
1759                                                    "create", self.expires)
1760        update_tasks = self._async_load_all_buckets(self.master,
1761                                                    self.update_gen,
1762                                                    "update", self.expires)
1763        delete_tasks = self._async_load_all_buckets(self.master,
1764                                                    self.delete_gen,
1765                                                    "delete", self.expires)
1766        tasks.extend(create_tasks)
1767        tasks.extend(update_tasks)
1768        tasks.extend(delete_tasks)
1769        return tasks
1770
1771    def ops_on_buckets(self):
1772        ops_tasks = self.async_ops_on_buckets()
1773        for task in ops_tasks:
1774            task.result()
1775
1776    def backup(self):
1777        """
1778        Backup the cluster and validate the backupset.
1779        :return: Nothing
1780        """
1781        self.backup_cluster_validate(repeats=self.number_of_repeats)
1782
1783    def backup_with_expiry(self):
1784        """
1785        Backup the cluster and validate the backupset with expiry items before they expire.
1786        :return: Nothing
1787        """
1788        for bucket in self.buckets:
1789            cb = self._get_python_sdk_client(self.master.ip, bucket, self.backupset.cluster_host)
1790            for i in range(int(self.num_items * 0.7) + 1, self.num_items + 1):
1791                cb.upsert("doc" + str(i), {"key":"value"}, ttl=self.expires)
1792        self.backup_cluster_validate()
1793        self.sleep(self.expires)
1794
1795    def backup_after_expiry(self):
1796        """
1797        Backup the cluster and validate the backupset with expiry items after they expire.
1798        :return: Nothing
1799        """
1800        for bucket in self.buckets:
1801            cb = self._get_python_sdk_client(self.master.ip, bucket, self.backupset.cluster_host)
1802            for i in range(int(self.num_items * 0.7) + 1, self.num_items + 1):
1803                cb.upsert("doc" + str(i), {"key":"value"}, ttl=self.expires)
1804        self.sleep(self.expires)
1805        self.backup_cluster_validate()
1806
1807    def backup_with_ops(self):
1808        """
1809        Backup the data while loading the buckets in parallel.
1810        :return: Nothing
1811        """
1812        ops_tasks = self.async_ops_on_buckets()
1813        self.backup()
1814        self.log.info(ops_tasks)
1815        for task in ops_tasks:
1816            task.result()
1817
1818    def backup_with_memcached_crash_and_restart(self):
1819        backup_result = self.cluster.async_backup_cluster(
1820                                           cluster_host=self.backupset.cluster_host,
1821                                           backup_host=self.backupset.backup_host,
1822                                           directory=self.backupset.directory,
1823                                           name=self.backupset.name,
1824                                           resume=self.backupset.resume,
1825                                           purge=self.backupset.purge,
1826                                           no_progress_bar=self.no_progress_bar,
1827                                           cli_command_location=self.cli_command_location,
1828                                           cb_version=self.cb_version)
1829        self.sleep(5)
1830        conn_bk = RemoteMachineShellConnection(self.backupset.cluster_host)
1831        conn_bk.pause_memcached(timesleep=8)
1832        conn_bk.unpause_memcached()
1833        conn_bk.disconnect()
1834        output = backup_result.result(timeout=200)
1835        if self.debug_logs:
1836            if output:
1837                print "\nOutput from backup cluster: %s " % output
1838            else:
1839                self.fail("No output printout.")
1840        self.assertTrue(self._check_output("Backup successfully completed", output),
1841                        "Backup failed with memcached crash and restart within 180 seconds")
1842        self.log.info("Backup succeeded with memcached crash and restart within 180 seconds")
1843        self.sleep(20)
1844        conn = RemoteMachineShellConnection(self.backupset.backup_host)
1845        command = "ls -tr {0}/{1} | tail -1".format(self.backupset.directory,
1846                                                    self.backupset.name)
1847        o, e = conn.execute_command(command)
1848        if o:
1849            self.backups.append(o[0])
1850        conn.log_command_output(o, e)
1851        self.number_of_backups_taken += 1
1852        self.store_vbucket_seqno()
1853        self.validation_helper.store_keys(self.cluster_to_backup, self.buckets,
1854                                          self.number_of_backups_taken,
1855                                          self.backup_validation_files_location)
1856        self.validation_helper.store_latest(self.cluster_to_backup, self.buckets,
1857                                            self.number_of_backups_taken,
1858                                            self.backup_validation_files_location)
1859        self.validation_helper.store_range_json(self.buckets, self.number_of_backups_taken,
1860                                         self.backup_validation_files_location, merge=True)
1861        conn.disconnect()
1862
1863    def _check_output(self, word_check, output):
1864        found = False
1865        if len(output) >=1 :
1866            for x in output:
1867                if word_check.lower() in x.lower():
1868                    self.log.info("Found \"%s\" in CLI output" % word_check)
1869                    found = True
1870                    break
1871        return found
1872
1873    def backup_with_erlang_crash_and_restart(self):
1874        backup_result = self.cluster.async_backup_cluster(cluster_host=self.backupset.cluster_host,
1875                                                          backup_host=self.backupset.backup_host,
1876                                                          directory=self.backupset.directory, name=self.backupset.name,
1877                                                          resume=self.backupset.resume, purge=self.backupset.purge,
1878                                                          no_progress_bar=self.no_progress_bar,
1879                                                          cli_command_location=self.cli_command_location,
1880                                                          cb_version=self.cb_version)
1881        self.sleep(10)
1882        conn = RemoteMachineShellConnection(self.backupset.cluster_host)
1883        conn.kill_erlang()
1884        conn.start_couchbase()
1885        output = backup_result.result(timeout=200)
1886        self.assertTrue(self._check_output("Backup successfully completed", output),
1887                        "Backup failed with erlang crash and restart within 180 seconds")
1888        self.log.info("Backup succeeded with erlang crash and restart within 180 seconds")
1889        self.sleep(30)
1890        conn.disconnect()
1891        conn = RemoteMachineShellConnection(self.backupset.backup_host)
1892        command = "ls -tr {0}/{1} | tail -1".format(self.backupset.directory, self.backupset.name)
1893        o, e = conn.execute_command(command)
1894        if o:
1895            self.backups.append(o[0])
1896        conn.log_command_output(o, e)
1897        self.number_of_backups_taken += 1
1898        self.store_vbucket_seqno()
1899        self.validation_helper.store_keys(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
1900                                          self.backup_validation_files_location)
1901        self.validation_helper.store_latest(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
1902                                            self.backup_validation_files_location)
1903        self.validation_helper.store_range_json(self.buckets, self.number_of_backups_taken,
1904                                                self.backup_validation_files_location)
1905        conn.disconnect()
1906
1907    def backup_with_cb_server_stop_and_restart(self):
1908        backup_result = self.cluster.async_backup_cluster(cluster_host=self.backupset.cluster_host,
1909                                                          backup_host=self.backupset.backup_host,
1910                                                          directory=self.backupset.directory, name=self.backupset.name,
1911                                                          resume=self.backupset.resume, purge=self.backupset.purge,
1912                                                          no_progress_bar=self.no_progress_bar,
1913                                                          cli_command_location=self.cli_command_location,
1914                                                          cb_version=self.cb_version)
1915        self.sleep(10)
1916        conn = RemoteMachineShellConnection(self.backupset.cluster_host)
1917        conn.stop_couchbase()
1918        conn.start_couchbase()
1919        output = backup_result.result(timeout=200)
1920        self.assertTrue(self._check_output("Backup successfully completed", output),
1921                        "Backup failed with couchbase stop and start within 180 seconds")
1922        self.log.info("Backup succeeded with couchbase stop and start within 180 seconds")
1923        self.sleep(30)
1924        conn.disconnect()
1925        conn = RemoteMachineShellConnection(self.backupset.backup_host)
1926        command = "ls -tr {0}/{1} | tail -1".format(self.backupset.directory, self.backupset.name)
1927        o, e = conn.execute_command(command)
1928        if o:
1929            self.backups.append(o[0])
1930        conn.log_command_output(o, e)
1931        self.number_of_backups_taken += 1
1932        self.store_vbucket_seqno()
1933        self.validation_helper.store_keys(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
1934                                          self.backup_validation_files_location)
1935        self.validation_helper.store_latest(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
1936                                            self.backup_validation_files_location)
1937        self.validation_helper.store_range_json(self.buckets, self.number_of_backups_taken,
1938                                                self.backup_validation_files_location)
1939        conn.disconnect()
1940
1941    def merge(self):
1942        """
1943        Merge all the existing backups in the backupset.
1944        :return: Nothing
1945        """
1946        self.backup_merge_validate(repeats=self.number_of_repeats, skip_validation=self.skip_validation)
1947
1948    def merge_with_ops(self):
1949        """
1950        Merge all the existing backups in the backupset while loading the
1951        buckets in parallel
1952        :return: Nothing
1953        """
1954        ops_tasks = self.async_ops_on_buckets()
1955        self.merge()
1956        for task in ops_tasks:
1957            task.result()
1958
1959    def merge_with_memcached_crash_and_restart(self):
1960        self.log.info("backups before merge: " + str(self.backups))
1961        self.log.info("number_of_backups_taken before merge: " + str(self.number_of_backups_taken))
1962        merge_result = self.cluster.async_merge_cluster(backup_host=self.backupset.backup_host,
1963                                                        backups=self.backups,
1964                                                        directory=self.backupset.directory, name=self.backupset.name,
1965                                                        cli_command_location=self.cli_command_location,
1966                                                        start=int(self.backupset.start),
1967                                                        end=int(self.backupset.end)
1968                                                        )
1969        conn = RemoteMachineShellConnection(self.backupset.backup_host)
1970        conn.pause_memcached()
1971        conn.unpause_memcached()
1972        output = merge_result.result(timeout=200)
1973        self.assertTrue(self._check_output("Merge completed successfully", output),
1974                        "Merge failed with memcached crash and restart within 180 seconds")
1975        self.log.info("Merge succeeded with memcached crash and restart within 180 seconds")
1976        self.sleep(30)
1977        del self.backups[self.backupset.start - 1:self.backupset.end]
1978        command = "ls -tr {0}/{1} | tail -1".format(self.backupset.directory, self.backupset.name)
1979        o, e = conn.execute_command(command)
1980        if o:
1981            self.backups.insert(self.backupset.start - 1, o[0])
1982        self.number_of_backups_taken -= (self.backupset.end - self.backupset.start + 1)
1983        self.number_of_backups_taken += 1
1984        self.log.info("backups after merge: " + str(self.backups))
1985        self.log.info("number_of_backups_taken after merge: " + str(self.number_of_backups_taken))
1986        if self.number_of_repeats < 2:
1987            self.validation_helper.store_keys(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
1988                                              self.backup_validation_files_location)
1989            self.validation_helper.store_latest(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
1990                                                self.backup_validation_files_location)
1991            self.validation_helper.store_range_json(self.buckets, self.number_of_backups_taken,
1992                                                    self.backup_validation_files_location)
1993            self.validation_helper.validate_merge(self.backup_validation_files_location)
1994        conn.disconnect()
1995
1996    def merge_with_erlang_crash_and_restart(self):
1997        self.log.info("backups before merge: " + str(self.backups))
1998        self.log.info("number_of_backups_taken before merge: " + str(self.number_of_backups_taken))
1999        merge_result = self.cluster.async_merge_cluster(backup_host=self.backupset.backup_host,
2000                                                        backups=self.backups,
2001                                                        directory=self.backupset.directory, name=self.backupset.name,
2002                                                        cli_command_location=self.cli_command_location,
2003                                                        start=int(self.backupset.start),
2004                                                        end=int(self.backupset.end)
2005                                                        )
2006        conn = RemoteMachineShellConnection(self.backupset.backup_host)
2007        conn.kill_erlang()
2008        conn.start_couchbase()
2009        output = merge_result.result(timeout=200)
2010        self.assertTrue(self._check_output("Merge completed successfully", output),
2011                        "Merge failed with erlang crash and restart within 180 seconds")
2012        self.log.info("Merge succeeded with erlang crash and restart within 180 seconds")
2013        self.sleep(30)
2014        del self.backups[self.backupset.start - 1:self.backupset.end]
2015        command = "ls -tr {0}/{1} | tail -1".format(self.backupset.directory, self.backupset.name)
2016        o, e = conn.execute_command(command)
2017        if o:
2018            self.backups.insert(self.backupset.start - 1, o[0])
2019        self.number_of_backups_taken -= (self.backupset.end - self.backupset.start + 1)
2020        self.number_of_backups_taken += 1
2021        self.log.info("backups after merge: " + str(self.backups))
2022        self.log.info("number_of_backups_taken after merge: " + str(self.number_of_backups_taken))
2023        if self.number_of_repeats < 2:
2024            self.validation_helper.store_keys(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
2025                                              self.backup_validation_files_location)
2026            self.validation_helper.store_latest(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
2027                                                self.backup_validation_files_location)
2028            self.validation_helper.store_range_json(self.buckets, self.number_of_backups_taken,
2029                                                    self.backup_validation_files_location)
2030            self.validation_helper.validate_merge(self.backup_validation_files_location)
2031        conn.disconnect()
2032
2033    def merge_with_cb_server_stop_and_restart(self):
2034        self.log.info("backups before merge: " + str(self.backups))
2035        self.log.info("number_of_backups_taken before merge: " + str(self.number_of_backups_taken))
2036        merge_result = self.cluster.async_merge_cluster(backup_host=self.backupset.backup_host,
2037                                                        backups=self.backups,
2038                                                        directory=self.backupset.directory, name=self.backupset.name,
2039                                                        cli_command_location=self.cli_command_location,
2040                                                        start=int(self.backupset.start),
2041                                                        end=int(self.backupset.end)
2042                                                        )
2043        conn = RemoteMachineShellConnection(self.backupset.backup_host)
2044        conn.stop_couchbase()
2045        conn.start_couchbase()
2046        output = merge_result.result(timeout=200)
2047        self.assertTrue(self._check_output("Merge completed successfully", output),
2048                        "Merge failed with couchbase stop and start within 180 seconds")
2049        self.log.info("Merge succeeded with couchbase stop and start within 180 seconds")
2050        self.sleep(30)
2051        del self.backups[self.backupset.start - 1:self.backupset.end]
2052        command = "ls -tr {0}/{1} | tail -1".format(self.backupset.directory, self.backupset.name)
2053        o, e = conn.execute_command(command)
2054        if o:
2055            self.backups.insert(self.backupset.start - 1, o[0])
2056        self.number_of_backups_taken -= (self.backupset.end - self.backupset.start + 1)
2057        self.number_of_backups_taken += 1
2058        self.log.info("backups after merge: " + str(self.backups))
2059        self.log.info("number_of_backups_taken after merge: " + str(self.number_of_backups_taken))
2060        if self.number_of_repeats < 2:
2061            self.validation_helper.store_keys(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
2062                                              self.backup_validation_files_location)
2063            self.validation_helper.store_latest(self.cluster_to_backup, self.buckets, self.number_of_backups_taken,
2064                                                self.backup_validation_files_location)
2065            self.validation_helper.store_range_json(self.buckets, self.number_of_backups_taken,
2066                                                    self.backup_validation_files_location)
2067            self.validation_helper.validate_merge(self.backup_validation_files_location)
2068        conn.disconnect()
2069
2070    def compact_backup(self):
2071        """
2072        Compact the given backup and validate that the compaction resulted
2073        in reduction of backup size.
2074        :return:
2075        """
2076        self.backup_compact_validate()
2077
2078    def async_rebalance(self):
2079        """
2080        Asynchronously rebalance the cluster.
2081        :return: The task that is rebalancing the nodes.
2082        """
2083        rest = RestConnection(self.master)
2084        # Get the nodes already in the backup cluster
2085        nodes_in_cluster = rest.node_statuses()
2086        # Get the nodes available for the backup_custer
2087        backup_cluster = [server for server in self.servers if (server not in
2088                          self.input.clusters[0] and server not in
2089                          self.input.clusters[1])]
2090        # Get the potential servers that can be added to the cluster
2091        serv_in = [server for server in backup_cluster if server.ip not in [
2092            node.ip for node in nodes_in_cluster]]
2093        # Get the potential servers that can be removed from the cluster
2094        serv_out = [server for server in backup_cluster if server.ip in [
2095            node.ip for node in nodes_in_cluster]]
2096        serv_in = serv_in[:self.nodes_in]
2097        serv_out = serv_out[serv_out.__len__() - self.nodes_out:]
2098        rebalance = self.cluster.async_rebalance(self.cluster_to_backup,
2099                                                 serv_in, serv_out)
2100        return rebalance
2101
2102    def rebalance(self):
2103        """
2104        Rebalance the cluster
2105        :return: Nothing
2106        """
2107        rebalance = self.async_rebalance()
2108        rebalance.result()
2109
2110    def rebalance_with_ops(self):
2111        """
2112        Rebalance the cluster while running bucket operations in parallel.
2113        :return: Nothing
2114        """
2115        ops_tasks = self.async_ops_on_buckets()
2116        self.rebalance()
2117        for task in ops_tasks:
2118            task.result()
2119
2120    def async_failover(self, ip=None):
2121        """
2122        Asynchronously failover a node
2123        :return: Nothing
2124        """
2125        rest = RestConnection(self.backupset.cluster_host)
2126        nodes_all = rest.node_statuses()
2127        if ip:
2128            ip_to_failover = ip
2129        else:
2130            ip_to_failover = self.servers[1].ip
2131        for node in nodes_all:
2132            if node.ip == ip_to_failover:
2133                rest.fail_over(otpNode=node.id, graceful=self.graceful)
2134
2135    def async_failover_and_recover(self):
2136        """
2137            Asynchronously failover a node and add back the node after 30 sec
2138            :return: Task that is running the rebalance at end.
2139        """
2140        rest = RestConnection(self.backupset.cluster_host)
2141        nodes_all = rest.node_statuses()
2142        for node in nodes_all:
2143            if node.ip == self.servers[1].ip:
2144                rest.fail_over(otpNode=node.id, graceful=self.graceful)
2145                self.sleep(60)
2146                rest.set_recovery_type(otpNode=node.id,
2147                                       recoveryType=self.recoveryType)
2148                rest.add_back_node(otpNode=node.id)
2149        rebalance = self.cluster.async_rebalance(self.servers, [], [])
2150        return rebalance
2151
2152    def async_recover_node(self):
2153        """
2154            Asynchronously add back the node after failover.
2155            :return: Task that is running the rebalance at end.
2156        """
2157        rest = RestConnection(self.backupset.cluster_host)
2158        nodes_all = rest.node_statuses()
2159        for node in nodes_all:
2160            if node.ip == self.servers[1].ip:
2161                rest.set_recovery_type(otpNode=node.id,
2162                                       recoveryType=self.recoveryType)
2163                rest.add_back_node(otpNode=node.id)
2164        rebalance = self.cluster.async_rebalance(self.servers, [], [])
2165        return rebalance
2166
2167    def async_remove_failed_node(self):
2168        """
2169            Asynchronously remove a failed over node by performing a
2170            rebalance of cluster
2171        :return: Task that is running the rebalance.
2172        """
2173        rebalance = self.cluster.async_rebalance(self.servers, [], [])
2174        return rebalance
2175
2176    def failover(self):
2177        """
2178        Failover a node and add back the node.
2179        :return: Nothing
2180        """
2181        self.async_failover()
2182
2183    def failover_with_ops(self):
2184        """
2185        Failover a node while bucket operations are running in parallel.
2186        :return: Nothing
2187        """
2188        ops_tasks = self.async_ops_on_buckets()
2189        self.failover()
2190        for task in ops_tasks:
2191            task.result()
2192
2193    def failover_and_recover(self):
2194        """
2195        Failover a node and add back the node after 30 sec.
2196        :return: Nothing
2197        """
2198        task = self.async_failover_and_recover()
2199        task.result()
2200
2201    def failover_and_recover_with_ops(self):
2202        """
2203        Failover a node and add back the node after 30 sec while bucket
2204        operations are running in parallel
2205        :return: Nothing
2206        """
2207        ops_tasks = self.async_ops_on_buckets()
2208        self.failover_and_recover()
2209        for task in ops_tasks:
2210            task.result()
2211
2212    def recover_node(self):
2213        """
2214        Recover a failedover node.
2215        :return: Nothing
2216        """
2217        task = self.async_recover_node()
2218        task.result()
2219
2220    def recover_node_with_ops(self):
2221        """
2222        Recover a failedover node while bucket operations are running in
2223        parallel.
2224        :return: Nothing
2225        """
2226        ops_tasks = self.async_ops_on_buckets()
2227        self.recover_node()
2228        for task in ops_tasks:
2229            task.result()
2230
2231    def remove_failed_node(self):
2232        """
2233        Remove a failed over node from the cluster
2234        :return: Nothing
2235        """
2236        task = self.async_remove_failed_node()
2237        task.result()
2238
2239    def _reconfigure_bucket_memory(self, num_new_buckets):
2240        """
2241        Reconfigure bucket memories in the cluster to accommodate adding new
2242        buckets
2243        :param num_new_buckets:
2244        :return: The new bucket size.
2245        """
2246        rest = RestConnection(self.master)
2247        ram_size = rest.get_nodes_self().memoryQuota
2248        bucket_size = self._get_bucket_size(ram_size, self.bucket_size +
2249                                            num_new_buckets)
2250        self.log.info("Changing the existing buckets size to accomodate new "
2251                      "buckets")
2252        for bucket in self.buckets:
2253            rest.change_bucket_props(bucket, ramQuotaMB=bucket_size)
2254        return bucket_size
2255
2256    def create_new_bucket_and_populate(self):
2257        """
2258        Create new buckets and populate the bucket with items.
2259        :return: Nothing
2260        """
2261        bucket_size = self._reconfigure_bucket_memory(self.new_buckets)
2262        rest = RestConnection(self.master)
2263        server_id = rest.get_nodes_self().id
2264        bucket_tasks = []
2265        bucket_params = copy.deepcopy(
2266            self.bucket_base_params['membase']['non_ephemeral'])
2267        bucket_params['size'] = bucket_size
2268        if self.ephemeral:
2269            bucket_params['bucket_type'] = 'ephemeral'
2270            bucket_params['eviction_policy'] = 'noEviction'
2271        standard_buckets = []
2272        for i in range(0, self.new_buckets):
2273            if self.recreate_bucket:
2274                name = self.backupset.deleted_buckets[i]
2275            else:
2276                name = 'bucket' + str(i)
2277            port = STANDARD_BUCKET_PORT + i + 1
2278            bucket_priority = None
2279            if self.standard_bucket_priority is not None:
2280                bucket_priority = self.get_bucket_priority(
2281                    self.standard_bucket_priority[i])
2282
2283            bucket_params['bucket_priority'] = bucket_priority
2284            bucket_tasks.append(
2285                self.cluster.async_create_standard_bucket(name=name, port=port,
2286                                                          bucket_params=bucket_params))
2287            bucket =