1import unittest 2import time 3import threading 4import os 5import subprocess 6import pprint 7import logging 8 9from TestInput import TestInputSingleton 10from lib import logger 11from lib import testconstants 12from lib.membase.api.rest_client import RestConnection, RestHelper, Bucket 13from lib.membase.helper.bucket_helper import BucketOperationHelper 14from lib.membase.helper.cluster_helper import ClusterOperationHelper 15from lib.membase.helper.rebalance_helper import RebalanceHelper 16from lib.membase.performance.stats import StatsCollector 17from lib.remote.remote_util import RemoteMachineShellConnection 18from lib.perf_engines.cbsoda import StoreCouchbase 19 20from pytests.performance.perf_defaults import PerfDefaults 21from lib.perf_engines import mcsoda 22 23 24class PerfBase(unittest.TestCase): 25 26 """ 27 specURL = http://hub.internal.couchbase.org/confluence/display/cbit/Black+Box+Performance+Test+Matrix 28 29 """ 30 31 # The setUpBaseX() methods allow subclasses to resequence the setUp() and 32 # skip cluster configuration. 33 def setUpBase0(self): 34 self.log = logger.Logger.get_logger() 35 self.input = TestInputSingleton.input 36 if self.input.param("log_level", None): 37 self.log.setLevel(level=0) 38 for hd in self.log.handlers: 39 if str(hd.__class__).find('FileHandler') != -1: 40 hd.setLevel(level=logging.DEBUG) 41 else: 42 hd.setLevel(level=getattr(logging, self.input.param("log_level", None))) 43 self.vbucket_count = PerfDefaults.vbuckets 44 self.sc = None 45 if self.parami("tear_down_on_setup", 46 PerfDefaults.tear_down_on_setup) == 1: 47 self.tearDown() # Tear down in case previous run had unclean death 48 master = self.input.servers[0] 49 self.set_up_rest(master) 50 51 def setUpBase1(self): 52 if max(self.parami('num_buckets', 1), 53 self.parami('xdcr_num_buckets', 1)) > 1: 54 bucket = 'bucket-0' 55 else: 56 bucket = self.param('bucket', 'default') 57 vBuckets = self.rest.get_vbuckets(bucket) 58 self.vbucket_count = len(vBuckets) if vBuckets else 0 59 60 def setUp(self): 61 self.setUpBase0() 62 63 mc_threads = self.parami("mc_threads", PerfDefaults.mc_threads) 64 if mc_threads != PerfDefaults.mc_threads: 65 for node in self.input.servers: 66 self.set_mc_threads(node, mc_threads) 67 68 erlang_schedulers = self.param("erlang_schedulers", 69 PerfDefaults.erlang_schedulers) 70 if erlang_schedulers: 71 ClusterOperationHelper.set_erlang_schedulers(self.input.servers, 72 erlang_schedulers) 73 master = self.input.servers[0] 74 75 self.is_multi_node = False 76 self.data_path = master.data_path 77 78 # Number of items loaded by load() method. 79 # Does not include or count any items that came from set_up_dgm(). 80 # 81 self.num_items_loaded = 0 82 83 if self.input.clusters: 84 for cluster in self.input.clusters.values(): 85 master = cluster[0] 86 self.set_up_rest(master) 87 self.set_up_cluster(master) 88 else: 89 master = self.input.servers[0] 90 self.set_up_cluster(master) 91 92 # Rebalance 93 if self.input.clusters: 94 for cluster in self.input.clusters.values(): 95 num_nodes = self.parami("num_nodes_before", len(cluster)) 96 self.rebalance_nodes(num_nodes, cluster) 97 else: 98 num_nodes = self.parami("num_nodes", 10) 99 self.rebalance_nodes(num_nodes) 100 101 if self.input.clusters: 102 for cluster in self.input.clusters.values(): 103 master = cluster[0] 104 self.set_up_rest(master) 105 self.set_up_buckets() 106 else: 107 self.set_up_buckets() 108 109 self.set_up_proxy() 110 111 if self.input.clusters: 112 for cluster in self.input.clusters.values(): 113 master = cluster[0] 114 self.set_up_rest(master) 115 self.reconfigure() 116 else: 117 self.reconfigure() 118 119 if self.parami("dgm", getattr(self, "dgm", 1)): 120 self.set_up_dgm() 121 122 time.sleep(10) 123 self.setUpBase1() 124 125 if self.input.clusters: 126 for cluster in self.input.clusters.values(): 127 self.wait_until_warmed_up(cluster[0]) 128 else: 129 self.wait_until_warmed_up() 130 ClusterOperationHelper.flush_os_caches(self.input.servers) 131 132 def set_up_rest(self, master): 133 self.rest = RestConnection(master) 134 self.rest_helper = RestHelper(self.rest) 135 136 def set_up_cluster(self, master): 137 """Initialize cluster""" 138 self.log.info("setting up cluster") 139 140 self.rest.init_cluster(master.rest_username, master.rest_password) 141 142 memory_quota = self.parami('mem_quota', PerfDefaults.mem_quota) 143 self.rest.init_cluster_memoryQuota(master.rest_username, 144 master.rest_password, 145 memoryQuota=memory_quota) 146 147 def _get_bucket_names(self, num_buckets): 148 """ 149 Get a list of bucket names 150 """ 151 if num_buckets > 1: 152 buckets = ['bucket-{0}'.format(i) for i in range(num_buckets)] 153 else: 154 buckets = [self.param('bucket', 'default')] 155 156 return buckets 157 158 def get_bucket_conf(self): 159 """ retrieve bucket configurations""" 160 161 num_buckets = max(self.parami('num_buckets', 1), 162 self.parami('xdcr_num_buckets', 1)) 163 self.buckets = self._get_bucket_names(num_buckets) 164 165 def set_up_buckets(self): 166 """Set up data bucket(s)""" 167 168 self.log.info("setting up buckets") 169 170 self.get_bucket_conf() 171 172 for bucket in self.buckets: 173 bucket_ram_quota = self.parami('mem_quota', PerfDefaults.mem_quota) 174 bucket_threads_num = self.parami('threads_number', PerfDefaults.threads_number) 175 bucket_ram_quota /= max(self.parami('num_buckets', 1), 176 self.parami('xdcr_num_buckets', 1)) 177 replicas = self.parami('replicas', getattr(self, 'replicas', 1)) 178 index_replicas = self.parami('index_replicas', 0) 179 180 self.rest.create_bucket(bucket=bucket, ramQuotaMB=bucket_ram_quota, 181 replicaNumber=replicas, authType='sasl', 182 threadsNumber=bucket_threads_num, 183 replica_index=index_replicas) 184 185 status = self.rest_helper.vbucket_map_ready(bucket, 60) 186 self.assertTrue(status, msg='vbucket_map not ready .. timed out') 187 status = self.rest_helper.bucket_exists(bucket) 188 self.assertTrue(status, 189 msg='unable to create {0} bucket'.format(bucket)) 190 191 def reconfigure(self): 192 """Customize basic Couchbase setup""" 193 self.log.info("customizing setup") 194 195 self.set_loglevel() 196 self.customize_xdcr_settings() 197 self.set_autocompaction() 198 self.set_exp_pager_stime() 199 self.set_rebalance_options() 200 201 def set_rebalance_options(self): 202 # rebalanceMovesBeforeCompaction 203 rmbc = self.parami('rebalance_moves_before_compaction', 0) 204 if rmbc: 205 cmd = 'ns_config:set(rebalance_moves_before_compaction, {0}).'\ 206 .format(rmbc) 207 self.rest.diag_eval(cmd) 208 209 def set_exp_pager_stime(self): 210 exp_pager_stime = self.param('exp_pager_stime', 211 PerfDefaults.exp_pager_stime) 212 if exp_pager_stime != PerfDefaults.exp_pager_stime: 213 self.set_ep_param('flush_param', 'exp_pager_stime', exp_pager_stime) 214 215 def set_loglevel(self): 216 """Set custom loglevel""" 217 218 loglevel = self.param('loglevel', None) 219 if loglevel: 220 self.rest.set_global_loglevel(loglevel) 221 222 def set_mc_threads(self, node, mc_threads): 223 """Change number of memcached threads""" 224 rest = RestConnection(node) 225 rest.set_mc_threads(mc_threads) 226 self.log.info("num of memcached threads = {0}".format(mc_threads)) 227 228 def customize_xdcr_settings(self): 229 """Set custom XDCR environment variables""" 230 max_concurrent_reps_per_doc = self.param('max_concurrent_reps_per_doc', None) 231 xdcr_doc_batch_size_kb = self.param('xdcr_doc_batch_size_kb', None) 232 xdcr_checkpoint_interval = self.param('xdcr_checkpoint_interval', None) 233 xdcr_latency_optimization = self.param('xdcr_latency_optimization', None) 234 235 if max_concurrent_reps_per_doc: 236 param = 'xdcrMaxConcurrentReps' 237 value = max_concurrent_reps_per_doc 238 elif xdcr_doc_batch_size_kb: 239 param = 'xdcrDocBatchSizeKb' 240 value = xdcr_doc_batch_size_kb 241 elif xdcr_checkpoint_interval: 242 param = 'xdcrCheckpointInterval' 243 value = xdcr_checkpoint_interval 244 else: 245 return 246 247 self.log.info("changing {0} to {1}".format(param, value)) 248 249 for servers in self.input.clusters.values(): 250 RestConnection(servers[0]).set_internalSetting(param, value) 251 252 def set_ep_compaction(self, comp_ratio): 253 """Set up ep_engine side compaction ratio""" 254 for server in self.input.servers: 255 shell = RemoteMachineShellConnection(server) 256 cmd = "/opt/couchbase/bin/cbepctl localhost:11210 "\ 257 "set flush_param db_frag_threshold {0}".format(comp_ratio) 258 self._exec_and_log(shell, cmd) 259 shell.disconnect() 260 261 def set_autocompaction(self, disable_view_compaction=False): 262 """Set custom auto-compaction settings""" 263 264 try: 265 # Parallel database and view compaction 266 parallel_compaction = self.param("parallel_compaction", 267 PerfDefaults.parallel_compaction) 268 # Database fragmentation threshold 269 db_compaction = self.parami("db_compaction", 270 PerfDefaults.db_compaction) 271 self.log.info("database compaction = {0}".format(db_compaction)) 272 273 # ep_engine fragementation threshold 274 ep_compaction = self.parami("ep_compaction", 275 PerfDefaults.ep_compaction) 276 if ep_compaction != PerfDefaults.ep_compaction: 277 self.set_ep_compaction(ep_compaction) 278 self.log.info("ep_engine compaction = {0}".format(ep_compaction)) 279 280 # View fragmentation threshold 281 if disable_view_compaction: 282 view_compaction = 100 283 else: 284 view_compaction = self.parami("view_compaction", 285 PerfDefaults.view_compaction) 286 # Set custom auto-compaction settings 287 self.rest.set_auto_compaction(parallelDBAndVC=parallel_compaction, 288 dbFragmentThresholdPercentage=db_compaction, 289 viewFragmntThresholdPercentage=view_compaction) 290 except Exception as e: 291 # It's very hard to determine what exception it can raise. 292 # Therefore we have to use general handler. 293 self.log.error("Error while changing compaction settings: {0}" 294 .format(e)) 295 296 def set_ep_param(self, type, param, value): 297 """ 298 Set ep-engine specific param, using cbepctl 299 300 type: paramter type, e.g: flush_param, tap_param, etc 301 """ 302 bucket = Bucket(name=self.buckets[0], authType="sasl", saslPassword="") 303 for server in self.input.servers: 304 shell = RemoteMachineShellConnection(server) 305 shell.execute_cbepctl(bucket, 306 "", "set %s" % type, param, value) 307 shell.disconnect() 308 309 def tearDown(self): 310 if self.parami("tear_down", 0) == 1: 311 self.log.info("routine skipped") 312 return 313 314 self.log.info("routine starts") 315 316 if self.parami("tear_down_proxy", 1) == 1: 317 self.tear_down_proxy() 318 else: 319 self.log.info("proxy tearDown skipped") 320 321 if self.sc is not None: 322 self.sc.stop() 323 self.sc = None 324 325 if self.parami("tear_down_bucket", 0) == 1: 326 self.tear_down_buckets() 327 else: 328 self.log.info("bucket tearDown skipped") 329 330 if self.parami("tear_down_cluster", 1) == 1: 331 self.tear_down_cluster() 332 else: 333 self.log.info("cluster tearDown skipped") 334 335 self.log.info("routine finished") 336 337 def tear_down_buckets(self): 338 self.log.info("tearing down bucket") 339 BucketOperationHelper.delete_all_buckets_or_assert(self.input.servers, 340 self) 341 self.log.info("bucket teared down") 342 343 def tear_down_cluster(self): 344 self.log.info("tearing down cluster") 345 ClusterOperationHelper.cleanup_cluster(self.input.servers) 346 ClusterOperationHelper.wait_for_ns_servers_or_assert(self.input.servers, 347 self) 348 self.log.info("Cluster teared down") 349 350 def set_up_proxy(self, bucket=None): 351 """Set up and start Moxi""" 352 353 if self.input.moxis: 354 self.log.info("setting up proxy") 355 356 bucket = bucket or self.param('bucket', 'default') 357 358 shell = RemoteMachineShellConnection(self.input.moxis[0]) 359 shell.start_moxi(self.input.servers[0].ip, bucket, 360 self.input.moxis[0].port) 361 shell.disconnect() 362 363 def tear_down_proxy(self): 364 if len(self.input.moxis) > 0: 365 shell = RemoteMachineShellConnection(self.input.moxis[0]) 366 shell.stop_moxi() 367 shell.disconnect() 368 369 # Returns "host:port" of moxi to hit. 370 def target_host_port(self, bucket='default', use_direct=False): 371 rv = self.param('moxi', None) 372 if use_direct: 373 return "%s:%s" % (self.input.servers[0].ip, 374 '11210') 375 if rv: 376 return rv 377 if len(self.input.moxis) > 0: 378 return "%s:%s" % (self.input.moxis[0].ip, 379 self.input.moxis[0].port) 380 return "%s:%s" % (self.input.servers[0].ip, 381 self.rest.get_bucket(bucket).nodes[0].moxi) 382 383 def protocol_parse(self, protocol_in, use_direct=False): 384 if protocol_in.find('://') >= 0: 385 if protocol_in.find("couchbase:") >= 0: 386 protocol = "couchbase" 387 else: 388 protocol = \ 389 '-'.join(((["membase"] + 390 protocol_in.split("://"))[-2] + "-binary").split('-')[0:2]) 391 host_port = ('@' + protocol_in.split("://")[-1]).split('@')[-1] 392 user, pswd = (('@' + 393 protocol_in.split("://")[-1]).split('@')[-2] + 394 ":").split(':')[0:2] 395 else: 396 protocol = 'memcached-' + protocol_in 397 host_port = self.target_host_port(use_direct=use_direct) 398 user = self.param("rest_username", "Administrator") 399 pswd = self.param("rest_password", "password") 400 return protocol, host_port, user, pswd 401 402 def mk_protocol(self, host, port='8091', prefix='membase-binary'): 403 return self.param('protocol', 404 prefix + '://' + host + ':' + port) 405 406 def get_backups(self, protocol): 407 """ Get backup server lists for memcached-binary """ 408 port = protocol.split(":")[-1] 409 return map(lambda server: "%s:%s" % (server.ip, port), 410 self.input.servers[1:]) 411 412 def restartProxy(self, bucket=None): 413 self.tear_down_proxy() 414 self.set_up_proxy(bucket) 415 416 def set_up_dgm(self): 417 """Download fragmented, DGM dataset onto each cluster node, if not 418 already locally available. 419 420 The number of vbuckets and database schema must match the 421 target cluster. 422 423 Shutdown all cluster nodes. 424 425 Do a cluster-restore. 426 427 Restart all cluster nodes.""" 428 429 bucket = self.param("bucket", "default") 430 ClusterOperationHelper.stop_cluster(self.input.servers) 431 for server in self.input.servers: 432 remote = RemoteMachineShellConnection(server) 433 #TODO: Better way to pass num_nodes and db_size? 434 self.get_data_files(remote, bucket, 1, 10) 435 remote.disconnect() 436 ClusterOperationHelper.start_cluster(self.input.servers) 437 438 def get_data_files(self, remote, bucket, num_nodes, db_size): 439 base = 'https://s3.amazonaws.com/database-analysis' 440 dir = '/tmp/' 441 if remote.is_couchbase_installed(): 442 dir = dir + '/couchbase/{0}-{1}-{2}/'.format(num_nodes, 256, 443 db_size) 444 output, error = remote.execute_command('mkdir -p {0}'.format(dir)) 445 remote.log_command_output(output, error) 446 file = '{0}_cb.tar.gz'.format(bucket) 447 base_url = base + '/couchbase/{0}-{1}-{2}/{3}'.format(num_nodes, 448 256, db_size, 449 file) 450 else: 451 dir = dir + '/membase/{0}-{1}-{2}/'.format(num_nodes, 1024, 452 db_size) 453 output, error = remote.execute_command('mkdir -p {0}'.format(dir)) 454 remote.log_command_output(output, error) 455 file = '{0}_mb.tar.gz'.format(bucket) 456 base_url = base + '/membase/{0}-{1}-{2}/{3}'.format(num_nodes, 457 1024, db_size, 458 file) 459 460 461 info = remote.extract_remote_info() 462 wget_command = 'wget' 463 if info.type.lower() == 'windows': 464 wget_command = \ 465 "cd {0} ;cmd /c 'c:\\automation\\wget.exe --no-check-certificate"\ 466 .format(dir) 467 468 # Check if the file exists on the remote server else download the gzipped version 469 # Extract if necessary 470 exist = remote.file_exists(dir, file) 471 if not exist: 472 additional_quote = "" 473 if info.type.lower() == 'windows': 474 additional_quote = "'" 475 command = "{0} -v -O {1}{2} {3} {4} ".format(wget_command, dir, 476 file, base_url, 477 additional_quote) 478 output, error = remote.execute_command(command) 479 remote.log_command_output(output, error) 480 481 if remote.is_couchbase_installed(): 482 if info.type.lower() == 'windows': 483 destination_folder = testconstants.WIN_COUCHBASE_DATA_PATH 484 else: 485 destination_folder = testconstants.COUCHBASE_DATA_PATH 486 else: 487 if info.type.lower() == 'windows': 488 destination_folder = testconstants.WIN_MEMBASE_DATA_PATH 489 else: 490 destination_folder = testconstants.MEMBASE_DATA_PATH 491 if self.data_path: 492 destination_folder = self.data_path 493 untar_command = 'cd {1}; tar -xzf {0}'.format(dir + file, 494 destination_folder) 495 output, error = remote.execute_command(untar_command) 496 remote.log_command_output(output, error) 497 498 def _exec_and_log(self, shell, cmd): 499 """helper method to execute a command and log output""" 500 if not cmd or not shell: 501 return 502 503 output, error = shell.execute_command(cmd) 504 shell.log_command_output(output, error) 505 506 def _build_tar_name(self, bucket, version="unknown_version", 507 file_base=None): 508 """build tar file name. 509 510 {file_base}-{version}-{bucket}.tar.gz 511 """ 512 if not file_base: 513 file_base = os.path.splitext( 514 os.path.basename(self.param("conf_file", 515 PerfDefaults.conf_file)))[0] 516 return "{0}-{1}-{2}.tar.gz".format(file_base, version, bucket) 517 518 def _save_snapshot(self, server, bucket, file_base=None): 519 """Save data files to a snapshot""" 520 521 src_data_path = os.path.dirname(server.data_path or 522 testconstants.COUCHBASE_DATA_PATH) 523 dest_data_path = "{0}-snapshots".format(src_data_path) 524 525 self.log.info("server={0}, src_data_path={1}, dest_data_path={2}" 526 .format(server.ip, src_data_path, dest_data_path)) 527 528 shell = RemoteMachineShellConnection(server) 529 530 build_name, short_version, full_version = \ 531 shell.find_build_version("/opt/couchbase/", "VERSION.txt", "cb") 532 533 dest_file = self._build_tar_name(bucket, full_version, file_base) 534 535 self._exec_and_log(shell, "mkdir -p {0}".format(dest_data_path)) 536 537 # save as gzip file, if file exsits, overwrite 538 # TODO: multiple buckets 539 zip_cmd = "cd {0}; tar -cvzf {1}/{2} {3} {3}-data _*"\ 540 .format(src_data_path, dest_data_path, dest_file, bucket) 541 self._exec_and_log(shell, zip_cmd) 542 543 shell.disconnect() 544 return True 545 546 def _load_snapshot(self, server, bucket, file_base=None, overwrite=True): 547 """Load data files from a snapshot""" 548 549 dest_data_path = os.path.dirname(server.data_path or 550 testconstants.COUCHBASE_DATA_PATH) 551 src_data_path = "{0}-snapshots".format(dest_data_path) 552 553 self.log.info("server={0}, src_data_path={1}, dest_data_path={2}" 554 .format(server.ip, src_data_path, dest_data_path)) 555 556 shell = RemoteMachineShellConnection(server) 557 558 build_name, short_version, full_version = \ 559 shell.find_build_version("/opt/couchbase/", "VERSION.txt", "cb") 560 561 src_file = self._build_tar_name(bucket, full_version, file_base) 562 563 if not shell.file_exists(src_data_path, src_file): 564 self.log.error("file '{0}/{1}' does not exist" 565 .format(src_data_path, src_file)) 566 shell.disconnect() 567 return False 568 569 if not overwrite: 570 self._save_snapshot(server, bucket, 571 "{0}.tar.gz".format( 572 time.strftime(PerfDefaults.strftime))) # TODO: filename 573 574 rm_cmd = "rm -rf {0}/{1} {0}/{1}-data {0}/_*".format(dest_data_path, 575 bucket) 576 self._exec_and_log(shell, rm_cmd) 577 578 unzip_cmd = "cd {0}; tar -xvzf {1}/{2}".format(dest_data_path, 579 src_data_path, src_file) 580 self._exec_and_log(shell, unzip_cmd) 581 582 shell.disconnect() 583 return True 584 585 def save_snapshots(self, file_base, bucket): 586 """Save snapshots on all servers""" 587 if not self.input.servers or not bucket: 588 self.log.error("invalid server list or bucket name") 589 return False 590 591 ClusterOperationHelper.stop_cluster(self.input.servers) 592 593 for server in self.input.servers: 594 self._save_snapshot(server, bucket, file_base) 595 596 ClusterOperationHelper.start_cluster(self.input.servers) 597 598 return True 599 600 def load_snapshots(self, file_base, bucket): 601 """Load snapshots on all servers""" 602 if not self.input.servers or not bucket: 603 self.log.error("invalid server list or bucket name") 604 return False 605 606 ClusterOperationHelper.stop_cluster(self.input.servers) 607 608 for server in self.input.servers: 609 if not self._load_snapshot(server, bucket, file_base): 610 ClusterOperationHelper.start_cluster(self.input.servers) 611 return False 612 613 ClusterOperationHelper.start_cluster(self.input.servers) 614 615 return True 616 617 def spec(self, reference): 618 self.spec_reference = self.param("spec", reference) 619 620 def mk_stats(self, verbosity): 621 return StatsCollector(verbosity) 622 623 def _get_src_version(self): 624 """get testrunner version""" 625 try: 626 result = subprocess.Popen(['git', 'rev-parse', 'HEAD'], 627 stdout=subprocess.PIPE).communicate()[0] 628 except subprocess.CalledProcessError as e: 629 self.log.error("unable to get src code version : {0}".format(e)) 630 return "unknown version" 631 return result.rstrip()[:7] 632 633 def start_stats(self, stats_spec, servers=None, 634 process_names=('memcached', 'beam.smp'), test_params=None, 635 client_id='', collect_server_stats=True, ddoc=None): 636 if self.parami('stats', 1) == 0: 637 return None 638 639 servers = servers or self.input.servers 640 clusters = None 641 if hasattr(self, "get_region"): 642 if self.parami("access_phase", 0): 643 clusters = self.input.clusters 644 if self.get_region() == "west": 645 clusters[0], clusters[1] = clusters[1], clusters[0] 646 sc = self.mk_stats(False) 647 bucket = self.param("bucket", "default") 648 sc.start(servers, bucket, process_names, stats_spec, client_id, 649 collect_server_stats=collect_server_stats, ddoc=ddoc, 650 clusters=clusters) 651 test_params['testrunner'] = self._get_src_version() 652 self.test_params = test_params 653 self.sc = sc 654 return self.sc 655 656 def end_stats(self, sc, total_stats=None, stats_spec=None): 657 if sc is None: 658 return 659 if stats_spec is None: 660 stats_spec = self.spec_reference 661 if total_stats: 662 sc.total_stats(total_stats) 663 self.log.info("stopping stats collector") 664 sc.stop() 665 self.log.info("stats collector is stopped") 666 sc.export(stats_spec, self.test_params) 667 668 def load(self, num_items, min_value_size=None, 669 kind='binary', 670 protocol='binary', 671 ratio_sets=1.0, 672 ratio_hot_sets=0.0, 673 ratio_hot_gets=0.0, 674 ratio_expirations=0.0, 675 expiration=None, 676 prefix="", 677 doc_cache=1, 678 use_direct=True, 679 report=0, 680 start_at=-1, 681 collect_server_stats=True, 682 is_eperf=False, 683 hot_shift=0): 684 cfg = {'max-items': num_items, 685 'max-creates': num_items, 686 'max-ops-per-sec': self.parami("load_mcsoda_max_ops_sec", 687 PerfDefaults.mcsoda_max_ops_sec), 688 'min-value-size': min_value_size or self.parami("min_value_size", 689 1024), 690 'ratio-sets': self.paramf("load_ratio_sets", ratio_sets), 691 'ratio-misses': self.paramf("load_ratio_misses", 0.0), 692 'ratio-creates': self.paramf("load_ratio_creates", 1.0), 693 'ratio-deletes': self.paramf("load_ratio_deletes", 0.0), 694 'ratio-hot': 0.0, 695 'ratio-hot-sets': ratio_hot_sets, 696 'ratio-hot-gets': ratio_hot_gets, 697 'ratio-expirations': ratio_expirations, 698 'expiration': expiration or 0, 699 'exit-after-creates': 1, 700 'json': int(kind == 'json'), 701 'batch': self.parami("batch", PerfDefaults.batch), 702 'vbuckets': self.vbucket_count, 703 'doc-cache': doc_cache, 704 'prefix': prefix, 705 'report': report, 706 'hot-shift': hot_shift, 707 'cluster_name': self.param("cluster_name", "")} 708 cur = {} 709 if start_at >= 0: 710 cur['cur-items'] = start_at 711 cur['cur-gets'] = start_at 712 cur['cur-sets'] = start_at 713 cur['cur-ops'] = cur['cur-gets'] + cur['cur-sets'] 714 cur['cur-creates'] = start_at 715 cfg['max-creates'] = start_at + num_items 716 cfg['max-items'] = cfg['max-creates'] 717 718 cfg_params = cfg.copy() 719 cfg_params['test_time'] = time.time() 720 cfg_params['test_name'] = self.id() 721 722 # phase: 'load' or 'reload' 723 phase = "load" 724 if self.parami("hot_load_phase", 0) == 1: 725 # all gets 726 if self.parami("hot_load_get", PerfDefaults.hot_load_get) == 1: 727 cfg['ratio-sets'] = 0 728 cfg['exit-after-creates'] = 0 729 cfg['exit-after-gets'] = 1 730 cfg['max-gets'] = start_at + num_items 731 phase = "reload" 732 733 if is_eperf: 734 collect_server_stats = self.parami("prefix", 0) == 0 735 client_id = self.parami("prefix", 0) 736 sc = self.start_stats("{0}.{1}".format(self.spec_reference, phase), # stats spec e.x: testname.load 737 test_params=cfg_params, client_id=client_id, 738 collect_server_stats=collect_server_stats) 739 740 # For Black box, multi node tests 741 # always use membase-binary 742 if self.is_multi_node: 743 protocol = self.mk_protocol(host=self.input.servers[0].ip, 744 port=self.input.servers[0].port) 745 746 protocol, host_port, user, pswd = \ 747 self.protocol_parse(protocol, use_direct=use_direct) 748 749 if not user.strip(): 750 if "11211" in host_port: 751 user = self.param("bucket", "default") 752 else: 753 user = self.input.servers[0].rest_username 754 if not pswd.strip(): 755 if not "11211" in host_port: 756 pswd = self.input.servers[0].rest_password 757 758 self.log.info("mcsoda %s %s %s %s" % 759 (protocol, host_port, user, pswd)) 760 self.log.info("mcsoda cfg:\n" + pprint.pformat(cfg)) 761 self.log.info("mcsoda cur:\n" + pprint.pformat(cfg)) 762 763 cur, start_time, end_time = \ 764 self.mcsoda_run(cfg, cur, protocol, host_port, user, pswd, 765 stats_collector=sc, heartbeat=self.parami("mcsoda_heartbeat", 0), 766 why="load", bucket=self.param("bucket", "default")) 767 self.num_items_loaded = num_items 768 ops = {'tot-sets': cur.get('cur-sets', 0), 769 'tot-gets': cur.get('cur-gets', 0), 770 'tot-items': cur.get('cur-items', 0), 771 'tot-creates': cur.get('cur-creates', 0), 772 'tot-misses': cur.get('cur-misses', 0), 773 "start-time": start_time, 774 "end-time": end_time} 775 776 if is_eperf: 777 if self.parami("load_wait_until_drained", 1) == 1: 778 self.wait_until_drained() 779 if self.parami("load_wait_until_repl", 780 PerfDefaults.load_wait_until_repl) == 1: 781 self.wait_until_repl() 782 self.end_stats(sc, ops, "{0}.{1}".format(self.spec_reference, 783 phase)) 784 785 return ops, start_time, end_time 786 787 def mcsoda_run(self, cfg, cur, protocol, host_port, user, pswd, 788 stats_collector=None, stores=None, ctl=None, 789 heartbeat=0, why="", bucket="default", backups=None): 790 return mcsoda.run(cfg, cur, protocol, host_port, user, pswd, 791 stats_collector=stats_collector, 792 stores=stores, 793 ctl=ctl, 794 heartbeat=heartbeat, 795 why=why, 796 bucket=bucket, 797 backups=backups) 798 799 def rebalance_nodes(self, num_nodes, cluster=None): 800 """Rebalance cluster(s) if more than 1 node provided""" 801 if len(self.input.servers) == 1 or num_nodes == 1: 802 self.log.warn("running on single node cluster") 803 return 804 else: 805 self.log.info("rebalancing nodes - num_nodes = {0}" 806 .format(num_nodes)) 807 808 if not cluster: 809 cluster = self.input.servers 810 status, _ = RebalanceHelper.rebalance_in(cluster, num_nodes - 1, 811 do_shuffle=False) 812 self.assertTrue(status) 813 814 def delayed_rebalance_worker(self, servers, num_nodes, delay_seconds, sc, 815 max_retries=PerfDefaults.reb_max_retries, 816 reb_mode=PerfDefaults.REB_MODE.IN): 817 time.sleep(delay_seconds) 818 gmt_now = time.strftime(PerfDefaults.strftime, time.gmtime()) 819 self.log.info("rebalance started") 820 821 if not sc: 822 self.log.error("invalid stats collector") 823 return 824 status = False 825 retries = 0 826 while not status and retries <= max_retries: 827 start_time = time.time() 828 if reb_mode == PerfDefaults.REB_MODE.OUT: 829 status, nodes = RebalanceHelper.rebalance_out(servers, num_nodes) 830 elif reb_mode == PerfDefaults.REB_MODE.SWAP: 831 status, nodes = RebalanceHelper.rebalance_swap(servers, num_nodes) 832 else: 833 status, nodes = RebalanceHelper.rebalance_in(servers, 834 num_nodes - 1, do_check=(not retries)) 835 end_time = time.time() 836 self.log.info("status: {0}, nodes: {1}, retries: {2}" 837 .format(status, nodes, retries)) 838 if not status: 839 retries += 1 840 time.sleep(delay_seconds) 841 sc.reb_stats(start_time, end_time - start_time) 842 if self.parami("master_events", PerfDefaults.master_events): 843 filename = "master_events.log" 844 with open(filename, "w") as f: 845 f.write(self.rest.diag_master_events()[1]) 846 847 def delayed_rebalance(self, num_nodes, delay_seconds=10, 848 max_retries=PerfDefaults.reb_max_retries, 849 reb_mode=0, sync=False): 850 self.log.info("delayed_rebalance") 851 if sync: 852 PerfBase.delayed_rebalance_worker(self, self.input.servers, 853 num_nodes, delay_seconds, self.sc, max_retries, reb_mode) 854 else: 855 t = threading.Thread(target=PerfBase.delayed_rebalance_worker, 856 args=(self, self.input.servers, num_nodes, 857 delay_seconds, self.sc, max_retries, reb_mode)) 858 t.daemon = True 859 t.start() 860 861 @staticmethod 862 def set_auto_compaction(server, parallel_compaction, percent_threshold): 863 rest = RestConnection(server) 864 rest.set_auto_compaction(parallel_compaction, 865 dbFragmentThresholdPercentage=percent_threshold, 866 viewFragmntThresholdPercentage=percent_threshold) 867 868 @staticmethod 869 def delayed_compaction_worker(servers, parallel_compaction, 870 percent_threshold, delay_seconds): 871 time.sleep(delay_seconds) 872 PerfBase.set_auto_compaction(servers[0], parallel_compaction, 873 percent_threshold) 874 875 def delayed_compaction(self, parallel_compaction="false", 876 percent_threshold=0.01, 877 delay_seconds=10): 878 t = threading.Thread(target=PerfBase.delayed_compaction_worker, 879 args=(self.input.servers, 880 parallel_compaction, 881 percent_threshold, 882 delay_seconds)) 883 t.daemon = True 884 t.start() 885 886 def loop(self, num_ops=None, 887 num_items=None, 888 max_items=None, 889 max_creates=None, 890 min_value_size=None, 891 exit_after_creates=0, 892 kind='binary', 893 protocol='binary', 894 clients=1, 895 ratio_misses=0.0, 896 ratio_sets=0.0, ratio_creates=0.0, ratio_deletes=0.0, 897 ratio_hot=0.2, ratio_hot_sets=0.95, ratio_hot_gets=0.95, 898 ratio_expirations=0.0, 899 expiration=None, 900 test_name=None, 901 prefix="", 902 doc_cache=1, 903 use_direct=True, 904 collect_server_stats=True, 905 start_at=-1, 906 report=0, 907 ctl=None, 908 hot_shift=0, 909 is_eperf=False, 910 ratio_queries=0, 911 queries=0, 912 ddoc=None): 913 num_items = num_items or self.num_items_loaded 914 915 hot_stack_size = \ 916 self.parami('hot_stack_size', PerfDefaults.hot_stack_size) or \ 917 (num_items * ratio_hot) 918 919 cfg = {'max-items': max_items or num_items, 920 'max-creates': max_creates or 0, 921 'max-ops-per-sec': self.parami("mcsoda_max_ops_sec", 922 PerfDefaults.mcsoda_max_ops_sec), 923 'min-value-size': min_value_size or self.parami("min_value_size", 924 1024), 925 'exit-after-creates': exit_after_creates, 926 'ratio-sets': ratio_sets, 927 'ratio-misses': ratio_misses, 928 'ratio-creates': ratio_creates, 929 'ratio-deletes': ratio_deletes, 930 'ratio-hot': ratio_hot, 931 'ratio-hot-sets': ratio_hot_sets, 932 'ratio-hot-gets': ratio_hot_gets, 933 'ratio-expirations': ratio_expirations, 934 'ratio-queries': ratio_queries, 935 'expiration': expiration or 0, 936 'threads': clients, 937 'json': int(kind == 'json'), 938 'batch': self.parami("batch", PerfDefaults.batch), 939 'vbuckets': self.vbucket_count, 940 'doc-cache': doc_cache, 941 'prefix': prefix, 942 'queries': queries, 943 'report': report, 944 'hot-shift': hot_shift, 945 'hot-stack': self.parami("hot_stack", PerfDefaults.hot_stack), 946 'hot-stack-size': hot_stack_size, 947 'hot-stack-rotate': self.parami("hot_stack_rotate", 948 PerfDefaults.hot_stack_rotate), 949 'cluster_name': self.param("cluster_name", ""), 950 'observe': self.param("observe", PerfDefaults.observe), 951 'obs-backoff': self.paramf('obs_backoff', 952 PerfDefaults.obs_backoff), 953 'obs-max-backoff': self.paramf('obs_max_backoff', 954 PerfDefaults.obs_max_backoff), 955 'obs-persist-count': self.parami('obs_persist_count', 956 PerfDefaults.obs_persist_count), 957 'obs-repl-count': self.parami('obs_repl_count', 958 PerfDefaults.obs_repl_count), 959 'woq-pattern': self.parami('woq_pattern', 960 PerfDefaults.woq_pattern), 961 'woq-verbose': self.parami('woq_verbose', 962 PerfDefaults.woq_verbose), 963 'cor-pattern': self.parami('cor_pattern', 964 PerfDefaults.cor_pattern), 965 'cor-persist': self.parami('cor_persist', 966 PerfDefaults.cor_persist), 967 'time': self.parami('time', 0), 968 'cbm': self.parami('cbm', PerfDefaults.cbm), 969 'cbm-host': self.param('cbm_host', PerfDefaults.cbm_host), 970 'cbm-port': self.parami('cbm_port', PerfDefaults.cbm_port)} 971 972 cfg_params = cfg.copy() 973 cfg_params['test_time'] = time.time() 974 cfg_params['test_name'] = test_name 975 client_id = '' 976 stores = None 977 978 if is_eperf: 979 client_id = self.parami("prefix", 0) 980 sc = None 981 if self.parami("collect_stats", 1): 982 sc = self.start_stats(self.spec_reference + ".loop", 983 test_params=cfg_params, client_id=client_id, 984 collect_server_stats=collect_server_stats, 985 ddoc=ddoc) 986 987 self.cur = {'cur-items': num_items} 988 if start_at >= 0: 989 self.cur['cur-gets'] = start_at 990 if num_ops is None: 991 num_ops = num_items 992 if isinstance(num_ops, int): 993 cfg['max-ops'] = num_ops 994 else: 995 # Here, we num_ops looks like "time to run" tuple of... 996 # ('seconds', integer_num_of_seconds_to_run) 997 cfg['time'] = num_ops[1] 998 999 # For Black box, multi node tests 1000 # always use membase-binary 1001 if self.is_multi_node: 1002 protocol = self.mk_protocol(host=self.input.servers[0].ip, 1003 port=self.input.servers[0].port) 1004 1005 backups = self.get_backups(protocol) 1006 self.log.info("mcsoda protocol %s" % protocol) 1007 protocol, host_port, user, pswd = \ 1008 self.protocol_parse(protocol, use_direct=use_direct) 1009 1010 if not user.strip(): 1011 if "11211" in host_port: 1012 user = self.param("bucket", "default") 1013 else: 1014 user = self.input.servers[0].rest_username 1015 if not pswd.strip(): 1016 if not "11211" in host_port: 1017 pswd = self.input.servers[0].rest_password 1018 1019 self.log.info("mcsoda %s %s %s %s" % 1020 (protocol, host_port, user, pswd)) 1021 self.log.info("mcsoda cfg:\n" + pprint.pformat(cfg)) 1022 self.log.info("mcsoda cur:\n" + pprint.pformat(cfg)) 1023 self.log.info("mcsoda backups: %s" % backups) 1024 1025 # For query tests always use StoreCouchbase 1026 if protocol == "couchbase": 1027 stores = [StoreCouchbase()] 1028 1029 self.cur, start_time, end_time = \ 1030 self.mcsoda_run(cfg, self.cur, protocol, host_port, user, pswd, 1031 stats_collector=sc, ctl=ctl, stores=stores, 1032 heartbeat=self.parami("mcsoda_heartbeat", 0), 1033 why="loop", bucket=self.param("bucket", "default"), 1034 backups=backups) 1035 1036 ops = {'tot-sets': self.cur.get('cur-sets', 0), 1037 'tot-gets': self.cur.get('cur-gets', 0), 1038 'tot-items': self.cur.get('cur-items', 0), 1039 'tot-creates': self.cur.get('cur-creates', 0), 1040 'tot-misses': self.cur.get('cur-misses', 0), 1041 "start-time": start_time, 1042 "end-time": end_time} 1043 1044 if self.parami("loop_wait_until_drained", 1045 PerfDefaults.loop_wait_until_drained): 1046 self.wait_until_drained() 1047 1048 if self.parami("loop_wait_until_repl", 1049 PerfDefaults.loop_wait_until_repl): 1050 self.wait_until_repl() 1051 1052 if self.parami("collect_stats", 1) and \ 1053 not self.parami("reb_no_fg", PerfDefaults.reb_no_fg): 1054 self.end_stats(sc, ops, self.spec_reference + ".loop") 1055 1056 why = self.params("why", "main") 1057 prefix = self.parami("prefix", 0) 1058 self.log.info("finished") 1059 1060 return ops, start_time, end_time 1061 1062 def wait_until_drained(self): 1063 self.log.info("draining disk write queue") 1064 1065 master = self.input.servers[0] 1066 bucket = self.param("bucket", "default") 1067 ready = RebalanceHelper.wait_for_persistence(master, bucket) 1068 self.assertTrue(ready, "not all items persisted. see logs") 1069 1070 self.log.info("disk write queue has been drained") 1071 return time.time() 1072 1073 def wait_until_repl(self): 1074 self.log.info("waiting for replication") 1075 1076 master = self.input.servers[0] 1077 bucket = self.param("bucket", "default") 1078 1079 RebalanceHelper.wait_for_stats_on_all(master, bucket, 1080 'vb_replica_queue_size', 0, 1081 fn=RebalanceHelper.wait_for_stats_no_timeout) 1082 1083 RebalanceHelper.wait_for_stats_on_all(master, bucket, 1084 'ep_tap_replica_queue_itemondisk', 0, 1085 fn=RebalanceHelper.wait_for_stats_no_timeout) 1086 1087 RebalanceHelper.wait_for_stats_on_all(master, bucket, 1088 'ep_tap_rebalance_queue_backfillremaining', 0, 1089 fn=RebalanceHelper.wait_for_stats_no_timeout) 1090 1091 RebalanceHelper.wait_for_stats_on_all(master, bucket, 1092 'ep_tap_replica_qlen', 0, 1093 fn=RebalanceHelper.wait_for_stats_no_timeout) 1094 1095 self.log.info("replication is done") 1096 1097 def warmup(self, collect_stats=True, flush_os_cache=False): 1098 """ 1099 Restart cluster and wait for it to warm up. 1100 In current version, affect the master node only. 1101 """ 1102 if not self.input.servers: 1103 self.log.error("empty server list") 1104 return 1105 1106 if collect_stats: 1107 client_id = self.parami("prefix", 0) 1108 test_params = {'test_time': time.time(), 1109 'test_name': self.id(), 1110 'json': 0} 1111 sc = self.start_stats(self.spec_reference + ".warmup", 1112 test_params=test_params, 1113 client_id=client_id) 1114 1115 self.log.info("preparing to warmup cluster ...") 1116 1117 server = self.input.servers[0] 1118 shell = RemoteMachineShellConnection(server) 1119 1120 start_time = time.time() 1121 1122 self.log.info("stopping couchbase ... ({0})".format(server.ip)) 1123 shell.stop_couchbase() 1124 self.log.info("couchbase stopped ({0})".format(server.ip)) 1125 1126 if flush_os_cache: 1127 self.log.info("flushing os cache ...") 1128 shell.flush_os_caches() 1129 1130 shell.start_couchbase() 1131 self.log.info("couchbase restarted ({0})".format(server.ip)) 1132 1133 self.wait_until_warmed_up() 1134 self.log.info("warmup finished") 1135 1136 end_time = time.time() 1137 ops = {'tot-sets': 0, 1138 'tot-gets': 0, 1139 'tot-items': 0, 1140 'tot-creates': 0, 1141 'tot-misses': 0, 1142 "start-time": start_time, 1143 "end-time": end_time} 1144 1145 if collect_stats: 1146 self.end_stats(sc, ops, self.spec_reference + ".warmup") 1147 1148 def wait_until_warmed_up(self, master=None): 1149 if not master: 1150 master = self.input.servers[0] 1151 1152 bucket = self.param("bucket", "default") 1153 1154 fn = RebalanceHelper.wait_for_mc_stats_no_timeout 1155 for bucket in self.buckets: 1156 RebalanceHelper.wait_for_stats_on_all(master, bucket, 1157 'ep_warmup_thread', 1158 'complete', fn=fn) 1159 def set_param(self, name, val): 1160 1161 input = getattr(self, "input", TestInputSingleton.input) 1162 input.test_params[name] = str(val) 1163 1164 return True 1165 1166 def wait_for_task_completion(self, task='indexer'): 1167 """Wait for ns_server task to finish""" 1168 t0 = time.time() 1169 self.log.info("Waiting 30 seconds before {0} monitoring".format(task)) 1170 time.sleep(30) 1171 1172 while True: 1173 tasks = self.rest.ns_server_tasks() 1174 if tasks: 1175 try: 1176 progress = [t['progress'] for t in tasks if t['type'] == task] 1177 except TypeError: 1178 self.log.error(tasks) 1179 else: 1180 if progress: 1181 self.log.info("{0} progress: {1}".format(task, progress)) 1182 time.sleep(10) 1183 else: 1184 break 1185 1186 t1 = time.time() 1187 self.log.info("Time taken to perform task: {0} sec".format(t1 - t0)) 1188 1189 def param(self, name, default_value): 1190 input = getattr(self, "input", TestInputSingleton.input) 1191 return input.test_params.get(name, default_value) 1192 1193 def parami(self, name, default_int): 1194 return int(self.param(name, default_int)) 1195 1196 def paramf(self, name, default_float): 1197 return float(self.param(name, default_float)) 1198 1199 def params(self, name, default_str): 1200 return str(self.param(name, default_str)) 1201