1import unittest
2import time
3import threading
4import os
5import subprocess
6import pprint
7import logging
8
9from TestInput import TestInputSingleton
10from lib import logger
11from lib import testconstants
12from lib.membase.api.rest_client import RestConnection, RestHelper, Bucket
13from lib.membase.helper.bucket_helper import BucketOperationHelper
14from lib.membase.helper.cluster_helper import ClusterOperationHelper
15from lib.membase.helper.rebalance_helper import RebalanceHelper
16from lib.membase.performance.stats import StatsCollector
17from lib.remote.remote_util import RemoteMachineShellConnection
18from lib.perf_engines.cbsoda import StoreCouchbase
19
20from pytests.performance.perf_defaults import PerfDefaults
21from lib.perf_engines import mcsoda
22
23
24class PerfBase(unittest.TestCase):
25
26    """
27    specURL = http://hub.internal.couchbase.org/confluence/display/cbit/Black+Box+Performance+Test+Matrix
28
29    """
30
31    # The setUpBaseX() methods allow subclasses to resequence the setUp() and
32    # skip cluster configuration.
33    def setUpBase0(self):
34        self.log = logger.Logger.get_logger()
35        self.input = TestInputSingleton.input
36        if self.input.param("log_level", None):
37            self.log.setLevel(level=0)
38            for hd in self.log.handlers:
39                if str(hd.__class__).find('FileHandler') != -1:
40                    hd.setLevel(level=logging.DEBUG)
41                else:
42                    hd.setLevel(level=getattr(logging, self.input.param("log_level", None)))
43        self.vbucket_count = PerfDefaults.vbuckets
44        self.sc = None
45        if self.parami("tear_down_on_setup",
46                       PerfDefaults.tear_down_on_setup) == 1:
47            self.tearDown()  # Tear down in case previous run had unclean death
48        master = self.input.servers[0]
49        self.set_up_rest(master)
50
51    def setUpBase1(self):
52        if max(self.parami('num_buckets', 1),
53               self.parami('xdcr_num_buckets', 1)) > 1:
54            bucket = 'bucket-0'
55        else:
56            bucket = self.param('bucket', 'default')
57        vBuckets = self.rest.get_vbuckets(bucket)
58        self.vbucket_count = len(vBuckets) if vBuckets else 0
59
60    def setUp(self):
61        self.setUpBase0()
62
63        mc_threads = self.parami("mc_threads", PerfDefaults.mc_threads)
64        if mc_threads != PerfDefaults.mc_threads:
65            for node in self.input.servers:
66                self.set_mc_threads(node, mc_threads)
67
68        erlang_schedulers = self.param("erlang_schedulers",
69                                       PerfDefaults.erlang_schedulers)
70        if erlang_schedulers:
71            ClusterOperationHelper.set_erlang_schedulers(self.input.servers,
72                                                         erlang_schedulers)
73        master = self.input.servers[0]
74
75        self.is_multi_node = False
76        self.data_path = master.data_path
77
78        # Number of items loaded by load() method.
79        # Does not include or count any items that came from set_up_dgm().
80        #
81        self.num_items_loaded = 0
82
83        if self.input.clusters:
84            for cluster in self.input.clusters.values():
85                master = cluster[0]
86                self.set_up_rest(master)
87                self.set_up_cluster(master)
88        else:
89            master = self.input.servers[0]
90            self.set_up_cluster(master)
91
92        # Rebalance
93        if self.input.clusters:
94            for cluster in self.input.clusters.values():
95                num_nodes = self.parami("num_nodes_before", len(cluster))
96                self.rebalance_nodes(num_nodes, cluster)
97        else:
98            num_nodes = self.parami("num_nodes", 10)
99            self.rebalance_nodes(num_nodes)
100
101        if self.input.clusters:
102            for cluster in self.input.clusters.values():
103                master = cluster[0]
104                self.set_up_rest(master)
105                self.set_up_buckets()
106        else:
107            self.set_up_buckets()
108
109        self.set_up_proxy()
110
111        if self.input.clusters:
112            for cluster in self.input.clusters.values():
113                master = cluster[0]
114                self.set_up_rest(master)
115                self.reconfigure()
116        else:
117            self.reconfigure()
118
119        if self.parami("dgm", getattr(self, "dgm", 1)):
120            self.set_up_dgm()
121
122        time.sleep(10)
123        self.setUpBase1()
124
125        if self.input.clusters:
126            for cluster in self.input.clusters.values():
127                self.wait_until_warmed_up(cluster[0])
128        else:
129            self.wait_until_warmed_up()
130        ClusterOperationHelper.flush_os_caches(self.input.servers)
131
132    def set_up_rest(self, master):
133        self.rest = RestConnection(master)
134        self.rest_helper = RestHelper(self.rest)
135
136    def set_up_cluster(self, master):
137        """Initialize cluster"""
138        self.log.info("setting up cluster")
139
140        self.rest.init_cluster(master.rest_username, master.rest_password)
141
142        memory_quota = self.parami('mem_quota', PerfDefaults.mem_quota)
143        self.rest.init_cluster_memoryQuota(master.rest_username,
144                                           master.rest_password,
145                                           memoryQuota=memory_quota)
146
147    def _get_bucket_names(self, num_buckets):
148        """
149        Get a list of bucket names
150        """
151        if num_buckets > 1:
152            buckets = ['bucket-{0}'.format(i) for i in range(num_buckets)]
153        else:
154            buckets = [self.param('bucket', 'default')]
155
156        return buckets
157
158    def get_bucket_conf(self):
159        """ retrieve bucket configurations"""
160
161        num_buckets = max(self.parami('num_buckets', 1),
162                          self.parami('xdcr_num_buckets', 1))
163        self.buckets = self._get_bucket_names(num_buckets)
164
165    def set_up_buckets(self):
166        """Set up data bucket(s)"""
167
168        self.log.info("setting up buckets")
169
170        self.get_bucket_conf()
171
172        for bucket in self.buckets:
173            bucket_ram_quota = self.parami('mem_quota', PerfDefaults.mem_quota)
174            bucket_threads_num = self.parami('threads_number', PerfDefaults.threads_number)
175            bucket_ram_quota /= max(self.parami('num_buckets', 1),
176                                    self.parami('xdcr_num_buckets', 1))
177            replicas = self.parami('replicas', getattr(self, 'replicas', 1))
178            index_replicas = self.parami('index_replicas', 0)
179
180            self.rest.create_bucket(bucket=bucket, ramQuotaMB=bucket_ram_quota,
181                                    replicaNumber=replicas, authType='sasl',
182                                    threadsNumber=bucket_threads_num,
183                                    replica_index=index_replicas)
184
185            status = self.rest_helper.vbucket_map_ready(bucket, 60)
186            self.assertTrue(status, msg='vbucket_map not ready .. timed out')
187            status = self.rest_helper.bucket_exists(bucket)
188            self.assertTrue(status,
189                            msg='unable to create {0} bucket'.format(bucket))
190
191    def reconfigure(self):
192        """Customize basic Couchbase setup"""
193        self.log.info("customizing setup")
194
195        self.set_loglevel()
196        self.customize_xdcr_settings()
197        self.set_autocompaction()
198        self.set_exp_pager_stime()
199        self.set_rebalance_options()
200
201    def set_rebalance_options(self):
202        # rebalanceMovesBeforeCompaction
203        rmbc = self.parami('rebalance_moves_before_compaction', 0)
204        if rmbc:
205            cmd = 'ns_config:set(rebalance_moves_before_compaction, {0}).'\
206                .format(rmbc)
207            self.rest.diag_eval(cmd)
208
209    def set_exp_pager_stime(self):
210        exp_pager_stime = self.param('exp_pager_stime',
211                                     PerfDefaults.exp_pager_stime)
212        if exp_pager_stime != PerfDefaults.exp_pager_stime:
213            self.set_ep_param('flush_param', 'exp_pager_stime', exp_pager_stime)
214
215    def set_loglevel(self):
216        """Set custom loglevel"""
217
218        loglevel = self.param('loglevel', None)
219        if loglevel:
220            self.rest.set_global_loglevel(loglevel)
221
222    def set_mc_threads(self, node, mc_threads):
223        """Change number of memcached threads"""
224        rest = RestConnection(node)
225        rest.set_mc_threads(mc_threads)
226        self.log.info("num of memcached threads = {0}".format(mc_threads))
227
228    def customize_xdcr_settings(self):
229        """Set custom XDCR environment variables"""
230        max_concurrent_reps_per_doc = self.param('max_concurrent_reps_per_doc', None)
231        xdcr_doc_batch_size_kb = self.param('xdcr_doc_batch_size_kb', None)
232        xdcr_checkpoint_interval = self.param('xdcr_checkpoint_interval', None)
233        xdcr_latency_optimization = self.param('xdcr_latency_optimization', None)
234
235        if max_concurrent_reps_per_doc:
236            param = 'xdcrMaxConcurrentReps'
237            value = max_concurrent_reps_per_doc
238        elif xdcr_doc_batch_size_kb:
239            param = 'xdcrDocBatchSizeKb'
240            value = xdcr_doc_batch_size_kb
241        elif xdcr_checkpoint_interval:
242            param = 'xdcrCheckpointInterval'
243            value = xdcr_checkpoint_interval
244        else:
245            return
246
247        self.log.info("changing {0} to {1}".format(param, value))
248
249        for servers in self.input.clusters.values():
250            rest_conn = RestConnection(servers[0])
251            replications = rest_conn.get_replications()
252            for repl in replications:
253                src_bucket = repl.get_src_bucket()
254                dst_bucket = repl.get_dest_bucket()
255                rest_conn.set_xdcr_param(src_bucket.name, dst_bucket.name, param, value)
256
257
258    def set_ep_compaction(self, comp_ratio):
259        """Set up ep_engine side compaction ratio"""
260        for server in self.input.servers:
261            shell = RemoteMachineShellConnection(server)
262            cmd = "/opt/couchbase/bin/cbepctl localhost:11210 "\
263                  "set flush_param db_frag_threshold {0}".format(comp_ratio)
264            self._exec_and_log(shell, cmd)
265            shell.disconnect()
266
267    def set_autocompaction(self, disable_view_compaction=False):
268        """Set custom auto-compaction settings"""
269
270        try:
271            # Parallel database and view compaction
272            parallel_compaction = self.param("parallel_compaction",
273                                             PerfDefaults.parallel_compaction)
274            # Database fragmentation threshold
275            db_compaction = self.parami("db_compaction",
276                                        PerfDefaults.db_compaction)
277            self.log.info("database compaction = {0}".format(db_compaction))
278
279            # ep_engine fragementation threshold
280            ep_compaction = self.parami("ep_compaction",
281                                        PerfDefaults.ep_compaction)
282            if ep_compaction != PerfDefaults.ep_compaction:
283                self.set_ep_compaction(ep_compaction)
284                self.log.info("ep_engine compaction = {0}".format(ep_compaction))
285
286            # View fragmentation threshold
287            if disable_view_compaction:
288                view_compaction = 100
289            else:
290                view_compaction = self.parami("view_compaction",
291                                              PerfDefaults.view_compaction)
292            # Set custom auto-compaction settings
293            self.rest.set_auto_compaction(parallelDBAndVC=parallel_compaction,
294                                          dbFragmentThresholdPercentage=db_compaction,
295                                          viewFragmntThresholdPercentage=view_compaction)
296        except Exception as e:
297            # It's very hard to determine what exception it can raise.
298            # Therefore we have to use general handler.
299            self.log.error("Error while changing compaction settings: {0}"
300                           .format(e))
301
302    def set_ep_param(self, type, param, value):
303        """
304        Set ep-engine specific param, using cbepctl
305
306        type: paramter type, e.g: flush_param, tap_param, etc
307        """
308        bucket = Bucket(name=self.buckets[0], authType="sasl", saslPassword="")
309        for server in self.input.servers:
310            shell = RemoteMachineShellConnection(server)
311            shell.execute_cbepctl(bucket,
312                                  "", "set %s" % type, param, value)
313            shell.disconnect()
314
315    def tearDown(self):
316        if self.parami("tear_down", 0) == 1:
317            self.log.info("routine skipped")
318            return
319
320        self.log.info("routine starts")
321
322        if self.parami("tear_down_proxy", 1) == 1:
323            self.tear_down_proxy()
324        else:
325            self.log.info("proxy tearDown skipped")
326
327        if self.sc is not None:
328            self.sc.stop()
329            self.sc = None
330
331        if self.parami("tear_down_bucket", 0) == 1:
332            self.tear_down_buckets()
333        else:
334            self.log.info("bucket tearDown skipped")
335
336        if self.parami("tear_down_cluster", 1) == 1:
337            self.tear_down_cluster()
338        else:
339            self.log.info("cluster tearDown skipped")
340
341        self.log.info("routine finished")
342
343    def tear_down_buckets(self):
344        self.log.info("tearing down bucket")
345        BucketOperationHelper.delete_all_buckets_or_assert(self.input.servers,
346                                                           self)
347        self.log.info("bucket teared down")
348
349    def tear_down_cluster(self):
350        self.log.info("tearing down cluster")
351        ClusterOperationHelper.cleanup_cluster(self.input.servers)
352        ClusterOperationHelper.wait_for_ns_servers_or_assert(self.input.servers,
353                                                             self)
354        self.log.info("Cluster teared down")
355
356    def set_up_proxy(self, bucket=None):
357        """Set up and start Moxi"""
358
359        if self.input.moxis:
360            self.log.info("setting up proxy")
361
362            bucket = bucket or self.param('bucket', 'default')
363
364            shell = RemoteMachineShellConnection(self.input.moxis[0])
365            shell.start_moxi(self.input.servers[0].ip, bucket,
366                             self.input.moxis[0].port)
367            shell.disconnect()
368
369    def tear_down_proxy(self):
370        if len(self.input.moxis) > 0:
371            shell = RemoteMachineShellConnection(self.input.moxis[0])
372            shell.stop_moxi()
373            shell.disconnect()
374
375    # Returns "host:port" of moxi to hit.
376    def target_host_port(self, bucket='default', use_direct=False):
377        rv = self.param('moxi', None)
378        if use_direct:
379            return "%s:%s" % (self.input.servers[0].ip,
380                              '11210')
381        if rv:
382            return rv
383        if len(self.input.moxis) > 0:
384            return "%s:%s" % (self.input.moxis[0].ip,
385                              self.input.moxis[0].port)
386        return "%s:%s" % (self.input.servers[0].ip,
387                          self.rest.get_bucket(bucket).nodes[0].moxi)
388
389    def protocol_parse(self, protocol_in, use_direct=False):
390        if protocol_in.find('://') >= 0:
391            if protocol_in.find("couchbase:") >= 0:
392                protocol = "couchbase"
393            else:
394                protocol = \
395                    '-'.join(((["membase"] +
396                    protocol_in.split("://"))[-2] + "-binary").split('-')[0:2])
397            host_port = ('@' + protocol_in.split("://")[-1]).split('@')[-1]
398            user, pswd = (('@' +
399                           protocol_in.split("://")[-1]).split('@')[-2] +
400                           ":").split(':')[0:2]
401        else:
402            protocol = 'memcached-' + protocol_in
403            host_port = self.target_host_port(use_direct=use_direct)
404            user = self.param("rest_username", "Administrator")
405            pswd = self.param("rest_password", "password")
406        return protocol, host_port, user, pswd
407
408    def mk_protocol(self, host, port='8091', prefix='membase-binary'):
409        return self.param('protocol',
410                          prefix + '://' + host + ':' + port)
411
412    def get_backups(self, protocol):
413        """ Get backup server lists for memcached-binary """
414        port = protocol.split(":")[-1]
415        return map(lambda server: "%s:%s" % (server.ip, port),
416                   self.input.servers[1:])
417
418    def restartProxy(self, bucket=None):
419        self.tear_down_proxy()
420        self.set_up_proxy(bucket)
421
422    def set_up_dgm(self):
423        """Download fragmented, DGM dataset onto each cluster node, if not
424        already locally available.
425
426        The number of vbuckets and database schema must match the
427        target cluster.
428
429        Shutdown all cluster nodes.
430
431        Do a cluster-restore.
432
433        Restart all cluster nodes."""
434
435        bucket = self.param("bucket", "default")
436        ClusterOperationHelper.stop_cluster(self.input.servers)
437        for server in self.input.servers:
438            remote = RemoteMachineShellConnection(server)
439            #TODO: Better way to pass num_nodes and db_size?
440            self.get_data_files(remote, bucket, 1, 10)
441            remote.disconnect()
442        ClusterOperationHelper.start_cluster(self.input.servers)
443
444    def get_data_files(self, remote, bucket, num_nodes, db_size):
445        base = 'https://s3.amazonaws.com/database-analysis'
446        dir = '/tmp/'
447        if remote.is_couchbase_installed():
448            dir = dir + '/couchbase/{0}-{1}-{2}/'.format(num_nodes, 256,
449                                                         db_size)
450            output, error = remote.execute_command('mkdir -p {0}'.format(dir))
451            remote.log_command_output(output, error)
452            file = '{0}_cb.tar.gz'.format(bucket)
453            base_url = base + '/couchbase/{0}-{1}-{2}/{3}'.format(num_nodes,
454                                                                  256, db_size,
455                                                                  file)
456        else:
457            dir = dir + '/membase/{0}-{1}-{2}/'.format(num_nodes, 1024,
458                                                       db_size)
459            output, error = remote.execute_command('mkdir -p {0}'.format(dir))
460            remote.log_command_output(output, error)
461            file = '{0}_mb.tar.gz'.format(bucket)
462            base_url = base + '/membase/{0}-{1}-{2}/{3}'.format(num_nodes,
463                                                                1024, db_size,
464                                                                file)
465
466
467        info = remote.extract_remote_info()
468        wget_command = 'wget'
469        if info.type.lower() == 'windows':
470            wget_command = \
471                "cd {0} ;cmd /c 'c:\\automation\\wget.exe --no-check-certificate"\
472                .format(dir)
473
474        # Check if the file exists on the remote server else download the gzipped version
475        # Extract if necessary
476        exist = remote.file_exists(dir, file)
477        if not exist:
478            additional_quote = ""
479            if info.type.lower() == 'windows':
480                additional_quote = "'"
481            command = "{0} -v -O {1}{2} {3} {4} ".format(wget_command, dir,
482                                                         file, base_url,
483                                                         additional_quote)
484            output, error = remote.execute_command(command)
485            remote.log_command_output(output, error)
486
487        if remote.is_couchbase_installed():
488            if info.type.lower() == 'windows':
489                destination_folder = testconstants.WIN_COUCHBASE_DATA_PATH
490            else:
491                destination_folder = testconstants.COUCHBASE_DATA_PATH
492        else:
493            if info.type.lower() == 'windows':
494                destination_folder = testconstants.WIN_MEMBASE_DATA_PATH
495            else:
496                destination_folder = testconstants.MEMBASE_DATA_PATH
497        if self.data_path:
498            destination_folder = self.data_path
499        untar_command = 'cd {1}; tar -xzf {0}'.format(dir + file,
500                                                      destination_folder)
501        output, error = remote.execute_command(untar_command)
502        remote.log_command_output(output, error)
503
504    def _exec_and_log(self, shell, cmd):
505        """helper method to execute a command and log output"""
506        if not cmd or not shell:
507            return
508
509        output, error = shell.execute_command(cmd)
510        shell.log_command_output(output, error)
511
512    def _build_tar_name(self, bucket, version="unknown_version",
513                        file_base=None):
514        """build tar file name.
515
516        {file_base}-{version}-{bucket}.tar.gz
517        """
518        if not file_base:
519            file_base = os.path.splitext(
520                os.path.basename(self.param("conf_file",
521                                 PerfDefaults.conf_file)))[0]
522        return "{0}-{1}-{2}.tar.gz".format(file_base, version, bucket)
523
524    def _save_snapshot(self, server, bucket, file_base=None):
525        """Save data files to a snapshot"""
526
527        src_data_path = os.path.dirname(server.data_path or
528                                        testconstants.COUCHBASE_DATA_PATH)
529        dest_data_path = "{0}-snapshots".format(src_data_path)
530
531        self.log.info("server={0}, src_data_path={1}, dest_data_path={2}"
532                      .format(server.ip, src_data_path, dest_data_path))
533
534        shell = RemoteMachineShellConnection(server)
535
536        build_name, short_version, full_version = \
537            shell.find_build_version("/opt/couchbase/", "VERSION.txt", "cb")
538
539        dest_file = self._build_tar_name(bucket, full_version, file_base)
540
541        self._exec_and_log(shell, "mkdir -p {0}".format(dest_data_path))
542
543        # save as gzip file, if file exsits, overwrite
544        # TODO: multiple buckets
545        zip_cmd = "cd {0}; tar -cvzf {1}/{2} {3} {3}-data _*"\
546            .format(src_data_path, dest_data_path, dest_file, bucket)
547        self._exec_and_log(shell, zip_cmd)
548
549        shell.disconnect()
550        return True
551
552    def _load_snapshot(self, server, bucket, file_base=None, overwrite=True):
553        """Load data files from a snapshot"""
554
555        dest_data_path = os.path.dirname(server.data_path or
556                                         testconstants.COUCHBASE_DATA_PATH)
557        src_data_path = "{0}-snapshots".format(dest_data_path)
558
559        self.log.info("server={0}, src_data_path={1}, dest_data_path={2}"
560                      .format(server.ip, src_data_path, dest_data_path))
561
562        shell = RemoteMachineShellConnection(server)
563
564        build_name, short_version, full_version = \
565            shell.find_build_version("/opt/couchbase/", "VERSION.txt", "cb")
566
567        src_file = self._build_tar_name(bucket, full_version, file_base)
568
569        if not shell.file_exists(src_data_path, src_file):
570            self.log.error("file '{0}/{1}' does not exist"
571                           .format(src_data_path, src_file))
572            shell.disconnect()
573            return False
574
575        if not overwrite:
576            self._save_snapshot(server, bucket,
577                                "{0}.tar.gz".format(
578                                    time.strftime(PerfDefaults.strftime)))  # TODO: filename
579
580        rm_cmd = "rm -rf {0}/{1} {0}/{1}-data {0}/_*".format(dest_data_path,
581                                                             bucket)
582        self._exec_and_log(shell, rm_cmd)
583
584        unzip_cmd = "cd {0}; tar -xvzf {1}/{2}".format(dest_data_path,
585                                                       src_data_path, src_file)
586        self._exec_and_log(shell, unzip_cmd)
587
588        shell.disconnect()
589        return True
590
591    def save_snapshots(self, file_base, bucket):
592        """Save snapshots on all servers"""
593        if not self.input.servers or not bucket:
594            self.log.error("invalid server list or bucket name")
595            return False
596
597        ClusterOperationHelper.stop_cluster(self.input.servers)
598
599        for server in self.input.servers:
600            self._save_snapshot(server, bucket, file_base)
601
602        ClusterOperationHelper.start_cluster(self.input.servers)
603
604        return True
605
606    def load_snapshots(self, file_base, bucket):
607        """Load snapshots on all servers"""
608        if not self.input.servers or not bucket:
609            self.log.error("invalid server list or bucket name")
610            return False
611
612        ClusterOperationHelper.stop_cluster(self.input.servers)
613
614        for server in self.input.servers:
615            if not self._load_snapshot(server, bucket, file_base):
616                ClusterOperationHelper.start_cluster(self.input.servers)
617                return False
618
619        ClusterOperationHelper.start_cluster(self.input.servers)
620
621        return True
622
623    def spec(self, reference):
624        self.spec_reference = self.param("spec", reference)
625
626    def mk_stats(self, verbosity):
627        return StatsCollector(verbosity)
628
629    def _get_src_version(self):
630        """get testrunner version"""
631        try:
632            result = subprocess.Popen(['git', 'rev-parse', 'HEAD'],
633                                      stdout=subprocess.PIPE).communicate()[0]
634        except subprocess.CalledProcessError as e:
635            self.log.error("unable to get src code version : {0}".format(e))
636            return "unknown version"
637        return result.rstrip()[:7]
638
639    def start_stats(self, stats_spec, servers=None,
640                    process_names=('memcached', 'beam.smp'), test_params=None,
641                    client_id='', collect_server_stats=True, ddoc=None):
642        if self.parami('stats', 1) == 0:
643            return None
644
645        servers = servers or self.input.servers
646        clusters = None
647        if hasattr(self, "get_region"):
648            if self.parami("access_phase", 0):
649                clusters = self.input.clusters
650                if self.get_region() == "west":
651                    clusters[0], clusters[1] = clusters[1], clusters[0]
652        sc = self.mk_stats(False)
653        bucket = self.param("bucket", "default")
654        sc.start(servers, bucket, process_names, stats_spec, client_id,
655                 collect_server_stats=collect_server_stats, ddoc=ddoc,
656                 clusters=clusters)
657        test_params['testrunner'] = self._get_src_version()
658        self.test_params = test_params
659        self.sc = sc
660        return self.sc
661
662    def end_stats(self, sc, total_stats=None, stats_spec=None):
663        if sc is None:
664            return
665        if stats_spec is None:
666            stats_spec = self.spec_reference
667        if total_stats:
668            sc.total_stats(total_stats)
669        self.log.info("stopping stats collector")
670        sc.stop()
671        self.log.info("stats collector is stopped")
672        sc.export(stats_spec, self.test_params)
673
674    def load(self, num_items, min_value_size=None,
675             kind='binary',
676             protocol='binary',
677             ratio_sets=1.0,
678             ratio_hot_sets=0.0,
679             ratio_hot_gets=0.0,
680             ratio_expirations=0.0,
681             expiration=None,
682             prefix="",
683             doc_cache=1,
684             use_direct=True,
685             report=0,
686             start_at=-1,
687             collect_server_stats=True,
688             is_eperf=False,
689             hot_shift=0):
690        cfg = {'max-items': num_items,
691               'max-creates': num_items,
692               'max-ops-per-sec': self.parami("load_mcsoda_max_ops_sec",
693                                              PerfDefaults.mcsoda_max_ops_sec),
694               'min-value-size': min_value_size or self.parami("min_value_size",
695                                                               1024),
696               'ratio-sets': self.paramf("load_ratio_sets", ratio_sets),
697               'ratio-misses': self.paramf("load_ratio_misses", 0.0),
698               'ratio-creates': self.paramf("load_ratio_creates", 1.0),
699               'ratio-deletes': self.paramf("load_ratio_deletes", 0.0),
700               'ratio-hot': 0.0,
701               'ratio-hot-sets': ratio_hot_sets,
702               'ratio-hot-gets': ratio_hot_gets,
703               'ratio-expirations': ratio_expirations,
704               'expiration': expiration or 0,
705               'exit-after-creates': 1,
706               'json': int(kind == 'json'),
707               'batch': self.parami("batch", PerfDefaults.batch),
708               'vbuckets': self.vbucket_count,
709               'doc-cache': doc_cache,
710               'prefix': prefix,
711               'report': report,
712               'hot-shift': hot_shift,
713               'cluster_name': self.param("cluster_name", "")}
714        cur = {}
715        if start_at >= 0:
716            cur['cur-items'] = start_at
717            cur['cur-gets'] = start_at
718            cur['cur-sets'] = start_at
719            cur['cur-ops'] = cur['cur-gets'] + cur['cur-sets']
720            cur['cur-creates'] = start_at
721            cfg['max-creates'] = start_at + num_items
722            cfg['max-items'] = cfg['max-creates']
723
724        cfg_params = cfg.copy()
725        cfg_params['test_time'] = time.time()
726        cfg_params['test_name'] = self.id()
727
728        # phase: 'load' or 'reload'
729        phase = "load"
730        if self.parami("hot_load_phase", 0) == 1:
731            # all gets
732            if self.parami("hot_load_get", PerfDefaults.hot_load_get) == 1:
733                cfg['ratio-sets'] = 0
734                cfg['exit-after-creates'] = 0
735                cfg['exit-after-gets'] = 1
736                cfg['max-gets'] = start_at + num_items
737            phase = "reload"
738
739        if is_eperf:
740            collect_server_stats = self.parami("prefix", 0) == 0
741            client_id = self.parami("prefix", 0)
742            sc = self.start_stats("{0}.{1}".format(self.spec_reference, phase),  # stats spec e.x: testname.load
743                                  test_params=cfg_params, client_id=client_id,
744                                  collect_server_stats=collect_server_stats)
745
746        # For Black box, multi node tests
747        # always use membase-binary
748        if self.is_multi_node:
749            protocol = self.mk_protocol(host=self.input.servers[0].ip,
750                                        port=self.input.servers[0].port)
751
752        protocol, host_port, user, pswd = \
753            self.protocol_parse(protocol, use_direct=use_direct)
754
755        if not user.strip():
756            if "11211" in host_port:
757                user = self.param("bucket", "default")
758            else:
759                user = self.input.servers[0].rest_username
760        if not pswd.strip():
761            if not "11211" in host_port:
762                pswd = self.input.servers[0].rest_password
763
764        self.log.info("mcsoda %s %s %s %s" %
765                      (protocol, host_port, user, pswd))
766        self.log.info("mcsoda cfg:\n" + pprint.pformat(cfg))
767        self.log.info("mcsoda cur:\n" + pprint.pformat(cfg))
768
769        cur, start_time, end_time = \
770            self.mcsoda_run(cfg, cur, protocol, host_port, user, pswd,
771                            stats_collector=sc, heartbeat=self.parami("mcsoda_heartbeat", 0),
772                            why="load", bucket=self.param("bucket", "default"))
773        self.num_items_loaded = num_items
774        ops = {'tot-sets': cur.get('cur-sets', 0),
775               'tot-gets': cur.get('cur-gets', 0),
776               'tot-items': cur.get('cur-items', 0),
777               'tot-creates': cur.get('cur-creates', 0),
778               'tot-misses': cur.get('cur-misses', 0),
779               "start-time": start_time,
780               "end-time": end_time}
781
782        if is_eperf:
783            if self.parami("load_wait_until_drained", 1) == 1:
784                self.wait_until_drained()
785            if self.parami("load_wait_until_repl",
786                PerfDefaults.load_wait_until_repl) == 1:
787                self.wait_until_repl()
788            self.end_stats(sc, ops, "{0}.{1}".format(self.spec_reference,
789                                                     phase))
790
791        return ops, start_time, end_time
792
793    def mcsoda_run(self, cfg, cur, protocol, host_port, user, pswd,
794                   stats_collector=None, stores=None, ctl=None,
795                   heartbeat=0, why="", bucket="default", backups=None):
796        return mcsoda.run(cfg, cur, protocol, host_port, user, pswd,
797                          stats_collector=stats_collector,
798                          stores=stores,
799                          ctl=ctl,
800                          heartbeat=heartbeat,
801                          why=why,
802                          bucket=bucket,
803                          backups=backups)
804
805    def rebalance_nodes(self, num_nodes, cluster=None):
806        """Rebalance cluster(s) if more than 1 node provided"""
807        if len(self.input.servers) == 1 or num_nodes == 1:
808            self.log.warn("running on single node cluster")
809            return
810        else:
811            self.log.info("rebalancing nodes - num_nodes = {0}"
812                          .format(num_nodes))
813
814        if not cluster:
815            cluster = self.input.servers
816        status, _ = RebalanceHelper.rebalance_in(cluster, num_nodes - 1,
817                                                 do_shuffle=False)
818        self.assertTrue(status)
819
820    def delayed_rebalance_worker(self, servers, num_nodes, delay_seconds, sc,
821                                 max_retries=PerfDefaults.reb_max_retries,
822                                 reb_mode=PerfDefaults.REB_MODE.IN):
823        time.sleep(delay_seconds)
824        gmt_now = time.strftime(PerfDefaults.strftime, time.gmtime())
825        self.log.info("rebalance started")
826
827        if not sc:
828            self.log.error("invalid stats collector")
829            return
830        status = False
831        retries = 0
832        while not status and retries <= max_retries:
833            start_time = time.time()
834            if reb_mode == PerfDefaults.REB_MODE.OUT:
835                status, nodes = RebalanceHelper.rebalance_out(servers, num_nodes)
836            elif reb_mode == PerfDefaults.REB_MODE.SWAP:
837                status, nodes = RebalanceHelper.rebalance_swap(servers, num_nodes)
838            else:
839                status, nodes = RebalanceHelper.rebalance_in(servers,
840                                        num_nodes - 1, do_check=(not retries))
841            end_time = time.time()
842            self.log.info("status: {0}, nodes: {1}, retries: {2}"
843                          .format(status, nodes, retries))
844            if not status:
845                retries += 1
846                time.sleep(delay_seconds)
847        sc.reb_stats(start_time, end_time - start_time)
848        if self.parami("master_events", PerfDefaults.master_events):
849            filename = "master_events.log"
850            with open(filename, "w") as f:
851                f.write(self.rest.diag_master_events()[1])
852
853    def delayed_rebalance(self, num_nodes, delay_seconds=10,
854                          max_retries=PerfDefaults.reb_max_retries,
855                          reb_mode=0, sync=False):
856        self.log.info("delayed_rebalance")
857        if sync:
858            PerfBase.delayed_rebalance_worker(self, self.input.servers,
859                num_nodes, delay_seconds, self.sc, max_retries, reb_mode)
860        else:
861            t = threading.Thread(target=PerfBase.delayed_rebalance_worker,
862                                 args=(self, self.input.servers, num_nodes,
863                                 delay_seconds, self.sc, max_retries, reb_mode))
864            t.daemon = True
865            t.start()
866
867    @staticmethod
868    def set_auto_compaction(server, parallel_compaction, percent_threshold):
869        rest = RestConnection(server)
870        rest.set_auto_compaction(parallel_compaction,
871                                 dbFragmentThresholdPercentage=percent_threshold,
872                                 viewFragmntThresholdPercentage=percent_threshold)
873
874    @staticmethod
875    def delayed_compaction_worker(servers, parallel_compaction,
876                                  percent_threshold, delay_seconds):
877        time.sleep(delay_seconds)
878        PerfBase.set_auto_compaction(servers[0], parallel_compaction,
879                                     percent_threshold)
880
881    def delayed_compaction(self, parallel_compaction="false",
882                           percent_threshold=0.01,
883                           delay_seconds=10):
884        t = threading.Thread(target=PerfBase.delayed_compaction_worker,
885                             args=(self.input.servers,
886                                   parallel_compaction,
887                                   percent_threshold,
888                                   delay_seconds))
889        t.daemon = True
890        t.start()
891
892    def loop(self, num_ops=None,
893             num_items=None,
894             max_items=None,
895             max_creates=None,
896             min_value_size=None,
897             exit_after_creates=0,
898             kind='binary',
899             protocol='binary',
900             clients=1,
901             ratio_misses=0.0,
902             ratio_sets=0.0, ratio_creates=0.0, ratio_deletes=0.0,
903             ratio_hot=0.2, ratio_hot_sets=0.95, ratio_hot_gets=0.95,
904             ratio_expirations=0.0,
905             expiration=None,
906             test_name=None,
907             prefix="",
908             doc_cache=1,
909             use_direct=True,
910             collect_server_stats=True,
911             start_at=-1,
912             report=0,
913             ctl=None,
914             hot_shift=0,
915             is_eperf=False,
916             ratio_queries=0,
917             queries=0,
918             ddoc=None):
919        num_items = num_items or self.num_items_loaded
920
921        hot_stack_size = \
922            self.parami('hot_stack_size', PerfDefaults.hot_stack_size) or \
923            (num_items * ratio_hot)
924
925        cfg = {'max-items': max_items or num_items,
926               'max-creates': max_creates or 0,
927               'max-ops-per-sec': self.parami("mcsoda_max_ops_sec",
928                                              PerfDefaults.mcsoda_max_ops_sec),
929               'min-value-size': min_value_size or self.parami("min_value_size",
930                                                               1024),
931               'exit-after-creates': exit_after_creates,
932               'ratio-sets': ratio_sets,
933               'ratio-misses': ratio_misses,
934               'ratio-creates': ratio_creates,
935               'ratio-deletes': ratio_deletes,
936               'ratio-hot': ratio_hot,
937               'ratio-hot-sets': ratio_hot_sets,
938               'ratio-hot-gets': ratio_hot_gets,
939               'ratio-expirations': ratio_expirations,
940               'ratio-queries': ratio_queries,
941               'expiration': expiration or 0,
942               'threads': clients,
943               'json': int(kind == 'json'),
944               'batch': self.parami("batch", PerfDefaults.batch),
945               'vbuckets': self.vbucket_count,
946               'doc-cache': doc_cache,
947               'prefix': prefix,
948               'queries': queries,
949               'report': report,
950               'hot-shift': hot_shift,
951               'hot-stack': self.parami("hot_stack", PerfDefaults.hot_stack),
952               'hot-stack-size': hot_stack_size,
953               'hot-stack-rotate': self.parami("hot_stack_rotate",
954                                               PerfDefaults.hot_stack_rotate),
955               'cluster_name': self.param("cluster_name", ""),
956               'observe': self.param("observe", PerfDefaults.observe),
957               'obs-backoff': self.paramf('obs_backoff',
958                                          PerfDefaults.obs_backoff),
959               'obs-max-backoff': self.paramf('obs_max_backoff',
960                                              PerfDefaults.obs_max_backoff),
961               'obs-persist-count': self.parami('obs_persist_count',
962                                                PerfDefaults.obs_persist_count),
963               'obs-repl-count': self.parami('obs_repl_count',
964                                             PerfDefaults.obs_repl_count),
965               'woq-pattern': self.parami('woq_pattern',
966                                         PerfDefaults.woq_pattern),
967               'woq-verbose': self.parami('woq_verbose',
968                                         PerfDefaults.woq_verbose),
969               'cor-pattern': self.parami('cor_pattern',
970                                         PerfDefaults.cor_pattern),
971               'cor-persist': self.parami('cor_persist',
972                                         PerfDefaults.cor_persist),
973               'time': self.parami('time', 0),
974               'cbm': self.parami('cbm', PerfDefaults.cbm),
975               'cbm-host': self.param('cbm_host', PerfDefaults.cbm_host),
976               'cbm-port': self.parami('cbm_port', PerfDefaults.cbm_port)}
977
978        cfg_params = cfg.copy()
979        cfg_params['test_time'] = time.time()
980        cfg_params['test_name'] = test_name
981        client_id = ''
982        stores = None
983
984        if is_eperf:
985            client_id = self.parami("prefix", 0)
986        sc = None
987        if self.parami("collect_stats", 1):
988            sc = self.start_stats(self.spec_reference + ".loop",
989                                  test_params=cfg_params, client_id=client_id,
990                                  collect_server_stats=collect_server_stats,
991                                  ddoc=ddoc)
992
993        self.cur = {'cur-items': num_items}
994        if start_at >= 0:
995            self.cur['cur-gets'] = start_at
996        if num_ops is None:
997            num_ops = num_items
998        if isinstance(num_ops, int):
999            cfg['max-ops'] = num_ops
1000        else:
1001            # Here, we num_ops looks like "time to run" tuple of...
1002            # ('seconds', integer_num_of_seconds_to_run)
1003            cfg['time'] = num_ops[1]
1004
1005        # For Black box, multi node tests
1006        # always use membase-binary
1007        if self.is_multi_node:
1008            protocol = self.mk_protocol(host=self.input.servers[0].ip,
1009                                        port=self.input.servers[0].port)
1010
1011        backups = self.get_backups(protocol)
1012        self.log.info("mcsoda protocol %s" % protocol)
1013        protocol, host_port, user, pswd = \
1014            self.protocol_parse(protocol, use_direct=use_direct)
1015
1016        if not user.strip():
1017            if "11211" in host_port:
1018                user = self.param("bucket", "default")
1019            else:
1020                user = self.input.servers[0].rest_username
1021        if not pswd.strip():
1022            if not "11211" in host_port:
1023                pswd = self.input.servers[0].rest_password
1024
1025        self.log.info("mcsoda %s %s %s %s" %
1026                      (protocol, host_port, user, pswd))
1027        self.log.info("mcsoda cfg:\n" + pprint.pformat(cfg))
1028        self.log.info("mcsoda cur:\n" + pprint.pformat(cfg))
1029        self.log.info("mcsoda backups: %s" % backups)
1030
1031        # For query tests always use StoreCouchbase
1032        if protocol == "couchbase":
1033            stores = [StoreCouchbase()]
1034
1035        self.cur, start_time, end_time = \
1036            self.mcsoda_run(cfg, self.cur, protocol, host_port, user, pswd,
1037                            stats_collector=sc, ctl=ctl, stores=stores,
1038                            heartbeat=self.parami("mcsoda_heartbeat", 0),
1039                            why="loop", bucket=self.param("bucket", "default"),
1040                            backups=backups)
1041
1042        ops = {'tot-sets': self.cur.get('cur-sets', 0),
1043               'tot-gets': self.cur.get('cur-gets', 0),
1044               'tot-items': self.cur.get('cur-items', 0),
1045               'tot-creates': self.cur.get('cur-creates', 0),
1046               'tot-misses': self.cur.get('cur-misses', 0),
1047               "start-time": start_time,
1048               "end-time": end_time}
1049
1050        if self.parami("loop_wait_until_drained",
1051                       PerfDefaults.loop_wait_until_drained):
1052            self.wait_until_drained()
1053
1054        if self.parami("loop_wait_until_repl",
1055                       PerfDefaults.loop_wait_until_repl):
1056            self.wait_until_repl()
1057
1058        if self.parami("collect_stats", 1) and \
1059                not self.parami("reb_no_fg", PerfDefaults.reb_no_fg):
1060            self.end_stats(sc, ops, self.spec_reference + ".loop")
1061
1062        why = self.params("why", "main")
1063        prefix = self.parami("prefix", 0)
1064        self.log.info("finished")
1065
1066        return ops, start_time, end_time
1067
1068    def wait_until_drained(self):
1069        self.log.info("draining disk write queue")
1070
1071        master = self.input.servers[0]
1072        bucket = self.param("bucket", "default")
1073        ready = RebalanceHelper.wait_for_persistence(master, bucket)
1074        self.assertTrue(ready, "not all items persisted. see logs")
1075
1076        self.log.info("disk write queue has been drained")
1077        return time.time()
1078
1079    def wait_until_repl(self):
1080        self.log.info("waiting for replication")
1081
1082        master = self.input.servers[0]
1083        bucket = self.param("bucket", "default")
1084
1085        RebalanceHelper.wait_for_stats_on_all(master, bucket,
1086            'vb_replica_queue_size', 0,
1087            fn=RebalanceHelper.wait_for_stats_no_timeout)
1088
1089        RebalanceHelper.wait_for_stats_on_all(master, bucket,
1090            'ep_tap_replica_queue_itemondisk', 0,
1091            fn=RebalanceHelper.wait_for_stats_no_timeout)
1092
1093        RebalanceHelper.wait_for_stats_on_all(master, bucket,
1094            'ep_tap_rebalance_queue_backfillremaining', 0,
1095            fn=RebalanceHelper.wait_for_stats_no_timeout)
1096
1097        RebalanceHelper.wait_for_stats_on_all(master, bucket,
1098            'ep_tap_replica_qlen', 0,
1099            fn=RebalanceHelper.wait_for_stats_no_timeout)
1100
1101        self.log.info("replication is done")
1102
1103    def warmup(self, collect_stats=True, flush_os_cache=False):
1104        """
1105        Restart cluster and wait for it to warm up.
1106        In current version, affect the master node only.
1107        """
1108        if not self.input.servers:
1109            self.log.error("empty server list")
1110            return
1111
1112        if collect_stats:
1113            client_id = self.parami("prefix", 0)
1114            test_params = {'test_time': time.time(),
1115                           'test_name': self.id(),
1116                           'json': 0}
1117            sc = self.start_stats(self.spec_reference + ".warmup",
1118                                  test_params=test_params,
1119                                  client_id=client_id)
1120
1121        self.log.info("preparing to warmup cluster ...")
1122
1123        server = self.input.servers[0]
1124        shell = RemoteMachineShellConnection(server)
1125
1126        start_time = time.time()
1127
1128        self.log.info("stopping couchbase ... ({0})".format(server.ip))
1129        shell.stop_couchbase()
1130        self.log.info("couchbase stopped ({0})".format(server.ip))
1131
1132        if flush_os_cache:
1133            self.log.info("flushing os cache ...")
1134            shell.flush_os_caches()
1135
1136        shell.start_couchbase()
1137        self.log.info("couchbase restarted ({0})".format(server.ip))
1138
1139        self.wait_until_warmed_up()
1140        self.log.info("warmup finished")
1141
1142        end_time = time.time()
1143        ops = {'tot-sets': 0,
1144               'tot-gets': 0,
1145               'tot-items': 0,
1146               'tot-creates': 0,
1147               'tot-misses': 0,
1148               "start-time": start_time,
1149               "end-time": end_time}
1150
1151        if collect_stats:
1152            self.end_stats(sc, ops, self.spec_reference + ".warmup")
1153
1154    def wait_until_warmed_up(self, master=None):
1155        if not master:
1156            master = self.input.servers[0]
1157
1158        bucket = self.param("bucket", "default")
1159
1160        fn = RebalanceHelper.wait_for_mc_stats_no_timeout
1161        for bucket in self.buckets:
1162            RebalanceHelper.wait_for_stats_on_all(master, bucket,
1163                                                  'ep_warmup_thread',
1164                                                  'complete', fn=fn)
1165    def set_param(self, name, val):
1166
1167        input = getattr(self, "input", TestInputSingleton.input)
1168        input.test_params[name] = str(val)
1169
1170        return True
1171
1172    def wait_for_task_completion(self, task='indexer'):
1173        """Wait for ns_server task to finish"""
1174        t0 = time.time()
1175        self.log.info("Waiting 30 seconds before {0} monitoring".format(task))
1176        time.sleep(30)
1177
1178        while True:
1179            tasks = self.rest.ns_server_tasks()
1180            if tasks:
1181                try:
1182                    progress = [t['progress'] for t in tasks if t['type'] == task]
1183                except TypeError:
1184                    self.log.error(tasks)
1185                else:
1186                    if progress:
1187                        self.log.info("{0} progress: {1}".format(task, progress))
1188                        time.sleep(10)
1189                    else:
1190                        break
1191
1192        t1 = time.time()
1193        self.log.info("Time taken to perform task: {0} sec".format(t1 - t0))
1194
1195    def param(self, name, default_value):
1196        input = getattr(self, "input", TestInputSingleton.input)
1197        return input.test_params.get(name, default_value)
1198
1199    def parami(self, name, default_int):
1200        return int(self.param(name, default_int))
1201
1202    def paramf(self, name, default_float):
1203        return float(self.param(name, default_float))
1204
1205    def params(self, name, default_str):
1206        return str(self.param(name, default_str))
1207