1import unittest
2import time
3import threading
4import os
5import subprocess
6import pprint
7import logging
8
9from TestInput import TestInputSingleton
10from lib import logger
11from lib import testconstants
12from lib.membase.api.rest_client import RestConnection, RestHelper, Bucket
13from lib.membase.helper.bucket_helper import BucketOperationHelper
14from lib.membase.helper.cluster_helper import ClusterOperationHelper
15from lib.membase.helper.rebalance_helper import RebalanceHelper
16from lib.membase.performance.stats import StatsCollector
17from lib.remote.remote_util import RemoteMachineShellConnection
18from lib.perf_engines.cbsoda import StoreCouchbase
19
20from pytests.performance.perf_defaults import PerfDefaults
21from lib.perf_engines import mcsoda
22
23
24class PerfBase(unittest.TestCase):
25
26    """
27    specURL = http://hub.internal.couchbase.org/confluence/display/cbit/Black+Box+Performance+Test+Matrix
28
29    """
30
31    # The setUpBaseX() methods allow subclasses to resequence the setUp() and
32    # skip cluster configuration.
33    def setUpBase0(self):
34        self.log = logger.Logger.get_logger()
35        self.input = TestInputSingleton.input
36        if self.input.param("log_level", None):
37            self.log.setLevel(level=0)
38            for hd in self.log.handlers:
39                if str(hd.__class__).find('FileHandler') != -1:
40                    hd.setLevel(level=logging.DEBUG)
41                else:
42                    hd.setLevel(level=getattr(logging, self.input.param("log_level", None)))
43        self.vbucket_count = PerfDefaults.vbuckets
44        self.sc = None
45        if self.parami("tear_down_on_setup",
46                       PerfDefaults.tear_down_on_setup) == 1:
47            self.tearDown()  # Tear down in case previous run had unclean death
48        master = self.input.servers[0]
49        self.set_up_rest(master)
50
51    def setUpBase1(self):
52        if max(self.parami('num_buckets', 1),
53               self.parami('xdcr_num_buckets', 1)) > 1:
54            bucket = 'bucket-0'
55        else:
56            bucket = self.param('bucket', 'default')
57        vBuckets = self.rest.get_vbuckets(bucket)
58        self.vbucket_count = len(vBuckets) if vBuckets else 0
59
60    def setUp(self):
61        self.setUpBase0()
62
63        mc_threads = self.parami("mc_threads", PerfDefaults.mc_threads)
64        if mc_threads != PerfDefaults.mc_threads:
65            for node in self.input.servers:
66                self.set_mc_threads(node, mc_threads)
67
68        erlang_schedulers = self.param("erlang_schedulers",
69                                       PerfDefaults.erlang_schedulers)
70        if erlang_schedulers:
71            ClusterOperationHelper.set_erlang_schedulers(self.input.servers,
72                                                         erlang_schedulers)
73        master = self.input.servers[0]
74
75        self.is_multi_node = False
76        self.data_path = master.data_path
77
78        # Number of items loaded by load() method.
79        # Does not include or count any items that came from set_up_dgm().
80        #
81        self.num_items_loaded = 0
82
83        if self.input.clusters:
84            for cluster in self.input.clusters.values():
85                master = cluster[0]
86                self.set_up_rest(master)
87                self.set_up_cluster(master)
88        else:
89            master = self.input.servers[0]
90            self.set_up_cluster(master)
91
92        # Rebalance
93        if self.input.clusters:
94            for cluster in self.input.clusters.values():
95                num_nodes = self.parami("num_nodes_before", len(cluster))
96                self.rebalance_nodes(num_nodes, cluster)
97        else:
98            num_nodes = self.parami("num_nodes", 10)
99            self.rebalance_nodes(num_nodes)
100
101        if self.input.clusters:
102            for cluster in self.input.clusters.values():
103                master = cluster[0]
104                self.set_up_rest(master)
105                self.set_up_buckets()
106        else:
107            self.set_up_buckets()
108
109        self.set_up_proxy()
110
111        if self.input.clusters:
112            for cluster in self.input.clusters.values():
113                master = cluster[0]
114                self.set_up_rest(master)
115                self.reconfigure()
116        else:
117            self.reconfigure()
118
119        if self.parami("dgm", getattr(self, "dgm", 1)):
120            self.set_up_dgm()
121
122        time.sleep(10)
123        self.setUpBase1()
124
125        if self.input.clusters:
126            for cluster in self.input.clusters.values():
127                self.wait_until_warmed_up(cluster[0])
128        else:
129            self.wait_until_warmed_up()
130        ClusterOperationHelper.flush_os_caches(self.input.servers)
131
132    def set_up_rest(self, master):
133        self.rest = RestConnection(master)
134        self.rest_helper = RestHelper(self.rest)
135
136    def set_up_cluster(self, master):
137        """Initialize cluster"""
138        self.log.info("setting up cluster")
139
140        self.rest.init_cluster(master.rest_username, master.rest_password)
141
142        memory_quota = self.parami('mem_quota', PerfDefaults.mem_quota)
143        self.rest.init_cluster_memoryQuota(master.rest_username,
144                                           master.rest_password,
145                                           memoryQuota=memory_quota)
146
147    def _get_bucket_names(self, num_buckets):
148        """
149        Get a list of bucket names
150        """
151        if num_buckets > 1:
152            buckets = ['bucket-{0}'.format(i) for i in range(num_buckets)]
153        else:
154            buckets = [self.param('bucket', 'default')]
155
156        return buckets
157
158    def get_bucket_conf(self):
159        """ retrieve bucket configurations"""
160
161        num_buckets = max(self.parami('num_buckets', 1),
162                          self.parami('xdcr_num_buckets', 1))
163        self.buckets = self._get_bucket_names(num_buckets)
164
165    def set_up_buckets(self):
166        """Set up data bucket(s)"""
167
168        self.log.info("setting up buckets")
169
170        self.get_bucket_conf()
171
172        for bucket in self.buckets:
173            bucket_ram_quota = self.parami('mem_quota', PerfDefaults.mem_quota)
174            bucket_threads_num = self.parami('threads_number', PerfDefaults.threads_number)
175            bucket_ram_quota /= max(self.parami('num_buckets', 1),
176                                    self.parami('xdcr_num_buckets', 1))
177            replicas = self.parami('replicas', getattr(self, 'replicas', 1))
178            index_replicas = self.parami('index_replicas', 0)
179
180            self.rest.create_bucket(bucket=bucket, ramQuotaMB=bucket_ram_quota,
181                                    replicaNumber=replicas, authType='sasl',
182                                    threadsNumber=bucket_threads_num,
183                                    replica_index=index_replicas)
184
185            status = self.rest_helper.vbucket_map_ready(bucket, 60)
186            self.assertTrue(status, msg='vbucket_map not ready .. timed out')
187            status = self.rest_helper.bucket_exists(bucket)
188            self.assertTrue(status,
189                            msg='unable to create {0} bucket'.format(bucket))
190
191    def reconfigure(self):
192        """Customize basic Couchbase setup"""
193        self.log.info("customizing setup")
194
195        self.set_loglevel()
196        self.customize_xdcr_settings()
197        self.set_autocompaction()
198        self.set_exp_pager_stime()
199        self.set_rebalance_options()
200
201    def set_rebalance_options(self):
202        # rebalanceMovesBeforeCompaction
203        rmbc = self.parami('rebalance_moves_before_compaction', 0)
204        if rmbc:
205            cmd = 'ns_config:set(rebalance_moves_before_compaction, {0}).'\
206                .format(rmbc)
207            self.rest.diag_eval(cmd)
208
209    def set_exp_pager_stime(self):
210        exp_pager_stime = self.param('exp_pager_stime',
211                                     PerfDefaults.exp_pager_stime)
212        if exp_pager_stime != PerfDefaults.exp_pager_stime:
213            self.set_ep_param('flush_param', 'exp_pager_stime', exp_pager_stime)
214
215    def set_loglevel(self):
216        """Set custom loglevel"""
217
218        loglevel = self.param('loglevel', None)
219        if loglevel:
220            self.rest.set_global_loglevel(loglevel)
221
222    def set_mc_threads(self, node, mc_threads):
223        """Change number of memcached threads"""
224        rest = RestConnection(node)
225        rest.set_mc_threads(mc_threads)
226        self.log.info("num of memcached threads = {0}".format(mc_threads))
227
228    def customize_xdcr_settings(self):
229        """Set custom XDCR environment variables"""
230        max_concurrent_reps_per_doc = self.param('max_concurrent_reps_per_doc', None)
231        xdcr_doc_batch_size_kb = self.param('xdcr_doc_batch_size_kb', None)
232        xdcr_checkpoint_interval = self.param('xdcr_checkpoint_interval', None)
233        xdcr_latency_optimization = self.param('xdcr_latency_optimization', None)
234
235        if max_concurrent_reps_per_doc:
236            param = 'xdcrMaxConcurrentReps'
237            value = max_concurrent_reps_per_doc
238        elif xdcr_doc_batch_size_kb:
239            param = 'xdcrDocBatchSizeKb'
240            value = xdcr_doc_batch_size_kb
241        elif xdcr_checkpoint_interval:
242            param = 'xdcrCheckpointInterval'
243            value = xdcr_checkpoint_interval
244        else:
245            return
246
247        self.log.info("changing {0} to {1}".format(param, value))
248
249        for servers in self.input.clusters.values():
250            RestConnection(servers[0]).set_internalSetting(param, value)
251
252    def set_ep_compaction(self, comp_ratio):
253        """Set up ep_engine side compaction ratio"""
254        for server in self.input.servers:
255            shell = RemoteMachineShellConnection(server)
256            cmd = "/opt/couchbase/bin/cbepctl localhost:11210 "\
257                  "set flush_param db_frag_threshold {0}".format(comp_ratio)
258            self._exec_and_log(shell, cmd)
259            shell.disconnect()
260
261    def set_autocompaction(self, disable_view_compaction=False):
262        """Set custom auto-compaction settings"""
263
264        try:
265            # Parallel database and view compaction
266            parallel_compaction = self.param("parallel_compaction",
267                                             PerfDefaults.parallel_compaction)
268            # Database fragmentation threshold
269            db_compaction = self.parami("db_compaction",
270                                        PerfDefaults.db_compaction)
271            self.log.info("database compaction = {0}".format(db_compaction))
272
273            # ep_engine fragementation threshold
274            ep_compaction = self.parami("ep_compaction",
275                                        PerfDefaults.ep_compaction)
276            if ep_compaction != PerfDefaults.ep_compaction:
277                self.set_ep_compaction(ep_compaction)
278                self.log.info("ep_engine compaction = {0}".format(ep_compaction))
279
280            # View fragmentation threshold
281            if disable_view_compaction:
282                view_compaction = 100
283            else:
284                view_compaction = self.parami("view_compaction",
285                                              PerfDefaults.view_compaction)
286            # Set custom auto-compaction settings
287            self.rest.set_auto_compaction(parallelDBAndVC=parallel_compaction,
288                                          dbFragmentThresholdPercentage=db_compaction,
289                                          viewFragmntThresholdPercentage=view_compaction)
290        except Exception as e:
291            # It's very hard to determine what exception it can raise.
292            # Therefore we have to use general handler.
293            self.log.error("Error while changing compaction settings: {0}"
294                           .format(e))
295
296    def set_ep_param(self, type, param, value):
297        """
298        Set ep-engine specific param, using cbepctl
299
300        type: paramter type, e.g: flush_param, tap_param, etc
301        """
302        bucket = Bucket(name=self.buckets[0], authType="sasl", saslPassword="")
303        for server in self.input.servers:
304            shell = RemoteMachineShellConnection(server)
305            shell.execute_cbepctl(bucket,
306                                  "", "set %s" % type, param, value)
307            shell.disconnect()
308
309    def tearDown(self):
310        if self.parami("tear_down", 0) == 1:
311            self.log.info("routine skipped")
312            return
313
314        self.log.info("routine starts")
315
316        if self.parami("tear_down_proxy", 1) == 1:
317            self.tear_down_proxy()
318        else:
319            self.log.info("proxy tearDown skipped")
320
321        if self.sc is not None:
322            self.sc.stop()
323            self.sc = None
324
325        if self.parami("tear_down_bucket", 0) == 1:
326            self.tear_down_buckets()
327        else:
328            self.log.info("bucket tearDown skipped")
329
330        if self.parami("tear_down_cluster", 1) == 1:
331            self.tear_down_cluster()
332        else:
333            self.log.info("cluster tearDown skipped")
334
335        self.log.info("routine finished")
336
337    def tear_down_buckets(self):
338        self.log.info("tearing down bucket")
339        BucketOperationHelper.delete_all_buckets_or_assert(self.input.servers,
340                                                           self)
341        self.log.info("bucket teared down")
342
343    def tear_down_cluster(self):
344        self.log.info("tearing down cluster")
345        ClusterOperationHelper.cleanup_cluster(self.input.servers)
346        ClusterOperationHelper.wait_for_ns_servers_or_assert(self.input.servers,
347                                                             self)
348        self.log.info("Cluster teared down")
349
350    def set_up_proxy(self, bucket=None):
351        """Set up and start Moxi"""
352
353        if self.input.moxis:
354            self.log.info("setting up proxy")
355
356            bucket = bucket or self.param('bucket', 'default')
357
358            shell = RemoteMachineShellConnection(self.input.moxis[0])
359            shell.start_moxi(self.input.servers[0].ip, bucket,
360                             self.input.moxis[0].port)
361            shell.disconnect()
362
363    def tear_down_proxy(self):
364        if len(self.input.moxis) > 0:
365            shell = RemoteMachineShellConnection(self.input.moxis[0])
366            shell.stop_moxi()
367            shell.disconnect()
368
369    # Returns "host:port" of moxi to hit.
370    def target_host_port(self, bucket='default', use_direct=False):
371        rv = self.param('moxi', None)
372        if use_direct:
373            return "%s:%s" % (self.input.servers[0].ip,
374                              '11210')
375        if rv:
376            return rv
377        if len(self.input.moxis) > 0:
378            return "%s:%s" % (self.input.moxis[0].ip,
379                              self.input.moxis[0].port)
380        return "%s:%s" % (self.input.servers[0].ip,
381                          self.rest.get_bucket(bucket).nodes[0].moxi)
382
383    def protocol_parse(self, protocol_in, use_direct=False):
384        if protocol_in.find('://') >= 0:
385            if protocol_in.find("couchbase:") >= 0:
386                protocol = "couchbase"
387            else:
388                protocol = \
389                    '-'.join(((["membase"] +
390                    protocol_in.split("://"))[-2] + "-binary").split('-')[0:2])
391            host_port = ('@' + protocol_in.split("://")[-1]).split('@')[-1]
392            user, pswd = (('@' +
393                           protocol_in.split("://")[-1]).split('@')[-2] +
394                           ":").split(':')[0:2]
395        else:
396            protocol = 'memcached-' + protocol_in
397            host_port = self.target_host_port(use_direct=use_direct)
398            user = self.param("rest_username", "Administrator")
399            pswd = self.param("rest_password", "password")
400        return protocol, host_port, user, pswd
401
402    def mk_protocol(self, host, port='8091', prefix='membase-binary'):
403        return self.param('protocol',
404                          prefix + '://' + host + ':' + port)
405
406    def get_backups(self, protocol):
407        """ Get backup server lists for memcached-binary """
408        port = protocol.split(":")[-1]
409        return map(lambda server: "%s:%s" % (server.ip, port),
410                   self.input.servers[1:])
411
412    def restartProxy(self, bucket=None):
413        self.tear_down_proxy()
414        self.set_up_proxy(bucket)
415
416    def set_up_dgm(self):
417        """Download fragmented, DGM dataset onto each cluster node, if not
418        already locally available.
419
420        The number of vbuckets and database schema must match the
421        target cluster.
422
423        Shutdown all cluster nodes.
424
425        Do a cluster-restore.
426
427        Restart all cluster nodes."""
428
429        bucket = self.param("bucket", "default")
430        ClusterOperationHelper.stop_cluster(self.input.servers)
431        for server in self.input.servers:
432            remote = RemoteMachineShellConnection(server)
433            #TODO: Better way to pass num_nodes and db_size?
434            self.get_data_files(remote, bucket, 1, 10)
435            remote.disconnect()
436        ClusterOperationHelper.start_cluster(self.input.servers)
437
438    def get_data_files(self, remote, bucket, num_nodes, db_size):
439        base = 'https://s3.amazonaws.com/database-analysis'
440        dir = '/tmp/'
441        if remote.is_couchbase_installed():
442            dir = dir + '/couchbase/{0}-{1}-{2}/'.format(num_nodes, 256,
443                                                         db_size)
444            output, error = remote.execute_command('mkdir -p {0}'.format(dir))
445            remote.log_command_output(output, error)
446            file = '{0}_cb.tar.gz'.format(bucket)
447            base_url = base + '/couchbase/{0}-{1}-{2}/{3}'.format(num_nodes,
448                                                                  256, db_size,
449                                                                  file)
450        else:
451            dir = dir + '/membase/{0}-{1}-{2}/'.format(num_nodes, 1024,
452                                                       db_size)
453            output, error = remote.execute_command('mkdir -p {0}'.format(dir))
454            remote.log_command_output(output, error)
455            file = '{0}_mb.tar.gz'.format(bucket)
456            base_url = base + '/membase/{0}-{1}-{2}/{3}'.format(num_nodes,
457                                                                1024, db_size,
458                                                                file)
459
460
461        info = remote.extract_remote_info()
462        wget_command = 'wget'
463        if info.type.lower() == 'windows':
464            wget_command = \
465                "cd {0} ;cmd /c 'c:\\automation\\wget.exe --no-check-certificate"\
466                .format(dir)
467
468        # Check if the file exists on the remote server else download the gzipped version
469        # Extract if necessary
470        exist = remote.file_exists(dir, file)
471        if not exist:
472            additional_quote = ""
473            if info.type.lower() == 'windows':
474                additional_quote = "'"
475            command = "{0} -v -O {1}{2} {3} {4} ".format(wget_command, dir,
476                                                         file, base_url,
477                                                         additional_quote)
478            output, error = remote.execute_command(command)
479            remote.log_command_output(output, error)
480
481        if remote.is_couchbase_installed():
482            if info.type.lower() == 'windows':
483                destination_folder = testconstants.WIN_COUCHBASE_DATA_PATH
484            else:
485                destination_folder = testconstants.COUCHBASE_DATA_PATH
486        else:
487            if info.type.lower() == 'windows':
488                destination_folder = testconstants.WIN_MEMBASE_DATA_PATH
489            else:
490                destination_folder = testconstants.MEMBASE_DATA_PATH
491        if self.data_path:
492            destination_folder = self.data_path
493        untar_command = 'cd {1}; tar -xzf {0}'.format(dir + file,
494                                                      destination_folder)
495        output, error = remote.execute_command(untar_command)
496        remote.log_command_output(output, error)
497
498    def _exec_and_log(self, shell, cmd):
499        """helper method to execute a command and log output"""
500        if not cmd or not shell:
501            return
502
503        output, error = shell.execute_command(cmd)
504        shell.log_command_output(output, error)
505
506    def _build_tar_name(self, bucket, version="unknown_version",
507                        file_base=None):
508        """build tar file name.
509
510        {file_base}-{version}-{bucket}.tar.gz
511        """
512        if not file_base:
513            file_base = os.path.splitext(
514                os.path.basename(self.param("conf_file",
515                                 PerfDefaults.conf_file)))[0]
516        return "{0}-{1}-{2}.tar.gz".format(file_base, version, bucket)
517
518    def _save_snapshot(self, server, bucket, file_base=None):
519        """Save data files to a snapshot"""
520
521        src_data_path = os.path.dirname(server.data_path or
522                                        testconstants.COUCHBASE_DATA_PATH)
523        dest_data_path = "{0}-snapshots".format(src_data_path)
524
525        self.log.info("server={0}, src_data_path={1}, dest_data_path={2}"
526                      .format(server.ip, src_data_path, dest_data_path))
527
528        shell = RemoteMachineShellConnection(server)
529
530        build_name, short_version, full_version = \
531            shell.find_build_version("/opt/couchbase/", "VERSION.txt", "cb")
532
533        dest_file = self._build_tar_name(bucket, full_version, file_base)
534
535        self._exec_and_log(shell, "mkdir -p {0}".format(dest_data_path))
536
537        # save as gzip file, if file exsits, overwrite
538        # TODO: multiple buckets
539        zip_cmd = "cd {0}; tar -cvzf {1}/{2} {3} {3}-data _*"\
540            .format(src_data_path, dest_data_path, dest_file, bucket)
541        self._exec_and_log(shell, zip_cmd)
542
543        shell.disconnect()
544        return True
545
546    def _load_snapshot(self, server, bucket, file_base=None, overwrite=True):
547        """Load data files from a snapshot"""
548
549        dest_data_path = os.path.dirname(server.data_path or
550                                         testconstants.COUCHBASE_DATA_PATH)
551        src_data_path = "{0}-snapshots".format(dest_data_path)
552
553        self.log.info("server={0}, src_data_path={1}, dest_data_path={2}"
554                      .format(server.ip, src_data_path, dest_data_path))
555
556        shell = RemoteMachineShellConnection(server)
557
558        build_name, short_version, full_version = \
559            shell.find_build_version("/opt/couchbase/", "VERSION.txt", "cb")
560
561        src_file = self._build_tar_name(bucket, full_version, file_base)
562
563        if not shell.file_exists(src_data_path, src_file):
564            self.log.error("file '{0}/{1}' does not exist"
565                           .format(src_data_path, src_file))
566            shell.disconnect()
567            return False
568
569        if not overwrite:
570            self._save_snapshot(server, bucket,
571                                "{0}.tar.gz".format(
572                                    time.strftime(PerfDefaults.strftime)))  # TODO: filename
573
574        rm_cmd = "rm -rf {0}/{1} {0}/{1}-data {0}/_*".format(dest_data_path,
575                                                             bucket)
576        self._exec_and_log(shell, rm_cmd)
577
578        unzip_cmd = "cd {0}; tar -xvzf {1}/{2}".format(dest_data_path,
579                                                       src_data_path, src_file)
580        self._exec_and_log(shell, unzip_cmd)
581
582        shell.disconnect()
583        return True
584
585    def save_snapshots(self, file_base, bucket):
586        """Save snapshots on all servers"""
587        if not self.input.servers or not bucket:
588            self.log.error("invalid server list or bucket name")
589            return False
590
591        ClusterOperationHelper.stop_cluster(self.input.servers)
592
593        for server in self.input.servers:
594            self._save_snapshot(server, bucket, file_base)
595
596        ClusterOperationHelper.start_cluster(self.input.servers)
597
598        return True
599
600    def load_snapshots(self, file_base, bucket):
601        """Load snapshots on all servers"""
602        if not self.input.servers or not bucket:
603            self.log.error("invalid server list or bucket name")
604            return False
605
606        ClusterOperationHelper.stop_cluster(self.input.servers)
607
608        for server in self.input.servers:
609            if not self._load_snapshot(server, bucket, file_base):
610                ClusterOperationHelper.start_cluster(self.input.servers)
611                return False
612
613        ClusterOperationHelper.start_cluster(self.input.servers)
614
615        return True
616
617    def spec(self, reference):
618        self.spec_reference = self.param("spec", reference)
619
620    def mk_stats(self, verbosity):
621        return StatsCollector(verbosity)
622
623    def _get_src_version(self):
624        """get testrunner version"""
625        try:
626            result = subprocess.Popen(['git', 'rev-parse', 'HEAD'],
627                                      stdout=subprocess.PIPE).communicate()[0]
628        except subprocess.CalledProcessError as e:
629            self.log.error("unable to get src code version : {0}".format(e))
630            return "unknown version"
631        return result.rstrip()[:7]
632
633    def start_stats(self, stats_spec, servers=None,
634                    process_names=('memcached', 'beam.smp'), test_params=None,
635                    client_id='', collect_server_stats=True, ddoc=None):
636        if self.parami('stats', 1) == 0:
637            return None
638
639        servers = servers or self.input.servers
640        clusters = None
641        if hasattr(self, "get_region"):
642            if self.parami("access_phase", 0):
643                clusters = self.input.clusters
644                if self.get_region() == "west":
645                    clusters[0], clusters[1] = clusters[1], clusters[0]
646        sc = self.mk_stats(False)
647        bucket = self.param("bucket", "default")
648        sc.start(servers, bucket, process_names, stats_spec, client_id,
649                 collect_server_stats=collect_server_stats, ddoc=ddoc,
650                 clusters=clusters)
651        test_params['testrunner'] = self._get_src_version()
652        self.test_params = test_params
653        self.sc = sc
654        return self.sc
655
656    def end_stats(self, sc, total_stats=None, stats_spec=None):
657        if sc is None:
658            return
659        if stats_spec is None:
660            stats_spec = self.spec_reference
661        if total_stats:
662            sc.total_stats(total_stats)
663        self.log.info("stopping stats collector")
664        sc.stop()
665        self.log.info("stats collector is stopped")
666        sc.export(stats_spec, self.test_params)
667
668    def load(self, num_items, min_value_size=None,
669             kind='binary',
670             protocol='binary',
671             ratio_sets=1.0,
672             ratio_hot_sets=0.0,
673             ratio_hot_gets=0.0,
674             ratio_expirations=0.0,
675             expiration=None,
676             prefix="",
677             doc_cache=1,
678             use_direct=True,
679             report=0,
680             start_at=-1,
681             collect_server_stats=True,
682             is_eperf=False,
683             hot_shift=0):
684        cfg = {'max-items': num_items,
685               'max-creates': num_items,
686               'max-ops-per-sec': self.parami("load_mcsoda_max_ops_sec",
687                                              PerfDefaults.mcsoda_max_ops_sec),
688               'min-value-size': min_value_size or self.parami("min_value_size",
689                                                               1024),
690               'ratio-sets': self.paramf("load_ratio_sets", ratio_sets),
691               'ratio-misses': self.paramf("load_ratio_misses", 0.0),
692               'ratio-creates': self.paramf("load_ratio_creates", 1.0),
693               'ratio-deletes': self.paramf("load_ratio_deletes", 0.0),
694               'ratio-hot': 0.0,
695               'ratio-hot-sets': ratio_hot_sets,
696               'ratio-hot-gets': ratio_hot_gets,
697               'ratio-expirations': ratio_expirations,
698               'expiration': expiration or 0,
699               'exit-after-creates': 1,
700               'json': int(kind == 'json'),
701               'batch': self.parami("batch", PerfDefaults.batch),
702               'vbuckets': self.vbucket_count,
703               'doc-cache': doc_cache,
704               'prefix': prefix,
705               'report': report,
706               'hot-shift': hot_shift,
707               'cluster_name': self.param("cluster_name", "")}
708        cur = {}
709        if start_at >= 0:
710            cur['cur-items'] = start_at
711            cur['cur-gets'] = start_at
712            cur['cur-sets'] = start_at
713            cur['cur-ops'] = cur['cur-gets'] + cur['cur-sets']
714            cur['cur-creates'] = start_at
715            cfg['max-creates'] = start_at + num_items
716            cfg['max-items'] = cfg['max-creates']
717
718        cfg_params = cfg.copy()
719        cfg_params['test_time'] = time.time()
720        cfg_params['test_name'] = self.id()
721
722        # phase: 'load' or 'reload'
723        phase = "load"
724        if self.parami("hot_load_phase", 0) == 1:
725            # all gets
726            if self.parami("hot_load_get", PerfDefaults.hot_load_get) == 1:
727                cfg['ratio-sets'] = 0
728                cfg['exit-after-creates'] = 0
729                cfg['exit-after-gets'] = 1
730                cfg['max-gets'] = start_at + num_items
731            phase = "reload"
732
733        if is_eperf:
734            collect_server_stats = self.parami("prefix", 0) == 0
735            client_id = self.parami("prefix", 0)
736            sc = self.start_stats("{0}.{1}".format(self.spec_reference, phase),  # stats spec e.x: testname.load
737                                  test_params=cfg_params, client_id=client_id,
738                                  collect_server_stats=collect_server_stats)
739
740        # For Black box, multi node tests
741        # always use membase-binary
742        if self.is_multi_node:
743            protocol = self.mk_protocol(host=self.input.servers[0].ip,
744                                        port=self.input.servers[0].port)
745
746        protocol, host_port, user, pswd = \
747            self.protocol_parse(protocol, use_direct=use_direct)
748
749        if not user.strip():
750            if "11211" in host_port:
751                user = self.param("bucket", "default")
752            else:
753                user = self.input.servers[0].rest_username
754        if not pswd.strip():
755            if not "11211" in host_port:
756                pswd = self.input.servers[0].rest_password
757
758        self.log.info("mcsoda %s %s %s %s" %
759                      (protocol, host_port, user, pswd))
760        self.log.info("mcsoda cfg:\n" + pprint.pformat(cfg))
761        self.log.info("mcsoda cur:\n" + pprint.pformat(cfg))
762
763        cur, start_time, end_time = \
764            self.mcsoda_run(cfg, cur, protocol, host_port, user, pswd,
765                            stats_collector=sc, heartbeat=self.parami("mcsoda_heartbeat", 0),
766                            why="load", bucket=self.param("bucket", "default"))
767        self.num_items_loaded = num_items
768        ops = {'tot-sets': cur.get('cur-sets', 0),
769               'tot-gets': cur.get('cur-gets', 0),
770               'tot-items': cur.get('cur-items', 0),
771               'tot-creates': cur.get('cur-creates', 0),
772               'tot-misses': cur.get('cur-misses', 0),
773               "start-time": start_time,
774               "end-time": end_time}
775
776        if is_eperf:
777            if self.parami("load_wait_until_drained", 1) == 1:
778                self.wait_until_drained()
779            if self.parami("load_wait_until_repl",
780                PerfDefaults.load_wait_until_repl) == 1:
781                self.wait_until_repl()
782            self.end_stats(sc, ops, "{0}.{1}".format(self.spec_reference,
783                                                     phase))
784
785        return ops, start_time, end_time
786
787    def mcsoda_run(self, cfg, cur, protocol, host_port, user, pswd,
788                   stats_collector=None, stores=None, ctl=None,
789                   heartbeat=0, why="", bucket="default", backups=None):
790        return mcsoda.run(cfg, cur, protocol, host_port, user, pswd,
791                          stats_collector=stats_collector,
792                          stores=stores,
793                          ctl=ctl,
794                          heartbeat=heartbeat,
795                          why=why,
796                          bucket=bucket,
797                          backups=backups)
798
799    def rebalance_nodes(self, num_nodes, cluster=None):
800        """Rebalance cluster(s) if more than 1 node provided"""
801        if len(self.input.servers) == 1 or num_nodes == 1:
802            self.log.warn("running on single node cluster")
803            return
804        else:
805            self.log.info("rebalancing nodes - num_nodes = {0}"
806                          .format(num_nodes))
807
808        if not cluster:
809            cluster = self.input.servers
810        status, _ = RebalanceHelper.rebalance_in(cluster, num_nodes - 1,
811                                                 do_shuffle=False)
812        self.assertTrue(status)
813
814    def delayed_rebalance_worker(self, servers, num_nodes, delay_seconds, sc,
815                                 max_retries=PerfDefaults.reb_max_retries,
816                                 reb_mode=PerfDefaults.REB_MODE.IN):
817        time.sleep(delay_seconds)
818        gmt_now = time.strftime(PerfDefaults.strftime, time.gmtime())
819        self.log.info("rebalance started")
820
821        if not sc:
822            self.log.error("invalid stats collector")
823            return
824        status = False
825        retries = 0
826        while not status and retries <= max_retries:
827            start_time = time.time()
828            if reb_mode == PerfDefaults.REB_MODE.OUT:
829                status, nodes = RebalanceHelper.rebalance_out(servers, num_nodes)
830            elif reb_mode == PerfDefaults.REB_MODE.SWAP:
831                status, nodes = RebalanceHelper.rebalance_swap(servers, num_nodes)
832            else:
833                status, nodes = RebalanceHelper.rebalance_in(servers,
834                                        num_nodes - 1, do_check=(not retries))
835            end_time = time.time()
836            self.log.info("status: {0}, nodes: {1}, retries: {2}"
837                          .format(status, nodes, retries))
838            if not status:
839                retries += 1
840                time.sleep(delay_seconds)
841        sc.reb_stats(start_time, end_time - start_time)
842        if self.parami("master_events", PerfDefaults.master_events):
843            filename = "master_events.log"
844            with open(filename, "w") as f:
845                f.write(self.rest.diag_master_events()[1])
846
847    def delayed_rebalance(self, num_nodes, delay_seconds=10,
848                          max_retries=PerfDefaults.reb_max_retries,
849                          reb_mode=0, sync=False):
850        self.log.info("delayed_rebalance")
851        if sync:
852            PerfBase.delayed_rebalance_worker(self, self.input.servers,
853                num_nodes, delay_seconds, self.sc, max_retries, reb_mode)
854        else:
855            t = threading.Thread(target=PerfBase.delayed_rebalance_worker,
856                                 args=(self, self.input.servers, num_nodes,
857                                 delay_seconds, self.sc, max_retries, reb_mode))
858            t.daemon = True
859            t.start()
860
861    @staticmethod
862    def set_auto_compaction(server, parallel_compaction, percent_threshold):
863        rest = RestConnection(server)
864        rest.set_auto_compaction(parallel_compaction,
865                                 dbFragmentThresholdPercentage=percent_threshold,
866                                 viewFragmntThresholdPercentage=percent_threshold)
867
868    @staticmethod
869    def delayed_compaction_worker(servers, parallel_compaction,
870                                  percent_threshold, delay_seconds):
871        time.sleep(delay_seconds)
872        PerfBase.set_auto_compaction(servers[0], parallel_compaction,
873                                     percent_threshold)
874
875    def delayed_compaction(self, parallel_compaction="false",
876                           percent_threshold=0.01,
877                           delay_seconds=10):
878        t = threading.Thread(target=PerfBase.delayed_compaction_worker,
879                             args=(self.input.servers,
880                                   parallel_compaction,
881                                   percent_threshold,
882                                   delay_seconds))
883        t.daemon = True
884        t.start()
885
886    def loop(self, num_ops=None,
887             num_items=None,
888             max_items=None,
889             max_creates=None,
890             min_value_size=None,
891             exit_after_creates=0,
892             kind='binary',
893             protocol='binary',
894             clients=1,
895             ratio_misses=0.0,
896             ratio_sets=0.0, ratio_creates=0.0, ratio_deletes=0.0,
897             ratio_hot=0.2, ratio_hot_sets=0.95, ratio_hot_gets=0.95,
898             ratio_expirations=0.0,
899             expiration=None,
900             test_name=None,
901             prefix="",
902             doc_cache=1,
903             use_direct=True,
904             collect_server_stats=True,
905             start_at=-1,
906             report=0,
907             ctl=None,
908             hot_shift=0,
909             is_eperf=False,
910             ratio_queries=0,
911             queries=0,
912             ddoc=None):
913        num_items = num_items or self.num_items_loaded
914
915        hot_stack_size = \
916            self.parami('hot_stack_size', PerfDefaults.hot_stack_size) or \
917            (num_items * ratio_hot)
918
919        cfg = {'max-items': max_items or num_items,
920               'max-creates': max_creates or 0,
921               'max-ops-per-sec': self.parami("mcsoda_max_ops_sec",
922                                              PerfDefaults.mcsoda_max_ops_sec),
923               'min-value-size': min_value_size or self.parami("min_value_size",
924                                                               1024),
925               'exit-after-creates': exit_after_creates,
926               'ratio-sets': ratio_sets,
927               'ratio-misses': ratio_misses,
928               'ratio-creates': ratio_creates,
929               'ratio-deletes': ratio_deletes,
930               'ratio-hot': ratio_hot,
931               'ratio-hot-sets': ratio_hot_sets,
932               'ratio-hot-gets': ratio_hot_gets,
933               'ratio-expirations': ratio_expirations,
934               'ratio-queries': ratio_queries,
935               'expiration': expiration or 0,
936               'threads': clients,
937               'json': int(kind == 'json'),
938               'batch': self.parami("batch", PerfDefaults.batch),
939               'vbuckets': self.vbucket_count,
940               'doc-cache': doc_cache,
941               'prefix': prefix,
942               'queries': queries,
943               'report': report,
944               'hot-shift': hot_shift,
945               'hot-stack': self.parami("hot_stack", PerfDefaults.hot_stack),
946               'hot-stack-size': hot_stack_size,
947               'hot-stack-rotate': self.parami("hot_stack_rotate",
948                                               PerfDefaults.hot_stack_rotate),
949               'cluster_name': self.param("cluster_name", ""),
950               'observe': self.param("observe", PerfDefaults.observe),
951               'obs-backoff': self.paramf('obs_backoff',
952                                          PerfDefaults.obs_backoff),
953               'obs-max-backoff': self.paramf('obs_max_backoff',
954                                              PerfDefaults.obs_max_backoff),
955               'obs-persist-count': self.parami('obs_persist_count',
956                                                PerfDefaults.obs_persist_count),
957               'obs-repl-count': self.parami('obs_repl_count',
958                                             PerfDefaults.obs_repl_count),
959               'woq-pattern': self.parami('woq_pattern',
960                                         PerfDefaults.woq_pattern),
961               'woq-verbose': self.parami('woq_verbose',
962                                         PerfDefaults.woq_verbose),
963               'cor-pattern': self.parami('cor_pattern',
964                                         PerfDefaults.cor_pattern),
965               'cor-persist': self.parami('cor_persist',
966                                         PerfDefaults.cor_persist),
967               'time': self.parami('time', 0),
968               'cbm': self.parami('cbm', PerfDefaults.cbm),
969               'cbm-host': self.param('cbm_host', PerfDefaults.cbm_host),
970               'cbm-port': self.parami('cbm_port', PerfDefaults.cbm_port)}
971
972        cfg_params = cfg.copy()
973        cfg_params['test_time'] = time.time()
974        cfg_params['test_name'] = test_name
975        client_id = ''
976        stores = None
977
978        if is_eperf:
979            client_id = self.parami("prefix", 0)
980        sc = None
981        if self.parami("collect_stats", 1):
982            sc = self.start_stats(self.spec_reference + ".loop",
983                                  test_params=cfg_params, client_id=client_id,
984                                  collect_server_stats=collect_server_stats,
985                                  ddoc=ddoc)
986
987        self.cur = {'cur-items': num_items}
988        if start_at >= 0:
989            self.cur['cur-gets'] = start_at
990        if num_ops is None:
991            num_ops = num_items
992        if isinstance(num_ops, int):
993            cfg['max-ops'] = num_ops
994        else:
995            # Here, we num_ops looks like "time to run" tuple of...
996            # ('seconds', integer_num_of_seconds_to_run)
997            cfg['time'] = num_ops[1]
998
999        # For Black box, multi node tests
1000        # always use membase-binary
1001        if self.is_multi_node:
1002            protocol = self.mk_protocol(host=self.input.servers[0].ip,
1003                                        port=self.input.servers[0].port)
1004
1005        backups = self.get_backups(protocol)
1006        self.log.info("mcsoda protocol %s" % protocol)
1007        protocol, host_port, user, pswd = \
1008            self.protocol_parse(protocol, use_direct=use_direct)
1009
1010        if not user.strip():
1011            if "11211" in host_port:
1012                user = self.param("bucket", "default")
1013            else:
1014                user = self.input.servers[0].rest_username
1015        if not pswd.strip():
1016            if not "11211" in host_port:
1017                pswd = self.input.servers[0].rest_password
1018
1019        self.log.info("mcsoda %s %s %s %s" %
1020                      (protocol, host_port, user, pswd))
1021        self.log.info("mcsoda cfg:\n" + pprint.pformat(cfg))
1022        self.log.info("mcsoda cur:\n" + pprint.pformat(cfg))
1023        self.log.info("mcsoda backups: %s" % backups)
1024
1025        # For query tests always use StoreCouchbase
1026        if protocol == "couchbase":
1027            stores = [StoreCouchbase()]
1028
1029        self.cur, start_time, end_time = \
1030            self.mcsoda_run(cfg, self.cur, protocol, host_port, user, pswd,
1031                            stats_collector=sc, ctl=ctl, stores=stores,
1032                            heartbeat=self.parami("mcsoda_heartbeat", 0),
1033                            why="loop", bucket=self.param("bucket", "default"),
1034                            backups=backups)
1035
1036        ops = {'tot-sets': self.cur.get('cur-sets', 0),
1037               'tot-gets': self.cur.get('cur-gets', 0),
1038               'tot-items': self.cur.get('cur-items', 0),
1039               'tot-creates': self.cur.get('cur-creates', 0),
1040               'tot-misses': self.cur.get('cur-misses', 0),
1041               "start-time": start_time,
1042               "end-time": end_time}
1043
1044        if self.parami("loop_wait_until_drained",
1045                       PerfDefaults.loop_wait_until_drained):
1046            self.wait_until_drained()
1047
1048        if self.parami("loop_wait_until_repl",
1049                       PerfDefaults.loop_wait_until_repl):
1050            self.wait_until_repl()
1051
1052        if self.parami("collect_stats", 1) and \
1053                not self.parami("reb_no_fg", PerfDefaults.reb_no_fg):
1054            self.end_stats(sc, ops, self.spec_reference + ".loop")
1055
1056        why = self.params("why", "main")
1057        prefix = self.parami("prefix", 0)
1058        self.log.info("finished")
1059
1060        return ops, start_time, end_time
1061
1062    def wait_until_drained(self):
1063        self.log.info("draining disk write queue")
1064
1065        master = self.input.servers[0]
1066        bucket = self.param("bucket", "default")
1067        ready = RebalanceHelper.wait_for_persistence(master, bucket)
1068        self.assertTrue(ready, "not all items persisted. see logs")
1069
1070        self.log.info("disk write queue has been drained")
1071        return time.time()
1072
1073    def wait_until_repl(self):
1074        self.log.info("waiting for replication")
1075
1076        master = self.input.servers[0]
1077        bucket = self.param("bucket", "default")
1078
1079        RebalanceHelper.wait_for_stats_on_all(master, bucket,
1080            'vb_replica_queue_size', 0,
1081            fn=RebalanceHelper.wait_for_stats_no_timeout)
1082
1083        RebalanceHelper.wait_for_stats_on_all(master, bucket,
1084            'ep_tap_replica_queue_itemondisk', 0,
1085            fn=RebalanceHelper.wait_for_stats_no_timeout)
1086
1087        RebalanceHelper.wait_for_stats_on_all(master, bucket,
1088            'ep_tap_rebalance_queue_backfillremaining', 0,
1089            fn=RebalanceHelper.wait_for_stats_no_timeout)
1090
1091        RebalanceHelper.wait_for_stats_on_all(master, bucket,
1092            'ep_tap_replica_qlen', 0,
1093            fn=RebalanceHelper.wait_for_stats_no_timeout)
1094
1095        self.log.info("replication is done")
1096
1097    def warmup(self, collect_stats=True, flush_os_cache=False):
1098        """
1099        Restart cluster and wait for it to warm up.
1100        In current version, affect the master node only.
1101        """
1102        if not self.input.servers:
1103            self.log.error("empty server list")
1104            return
1105
1106        if collect_stats:
1107            client_id = self.parami("prefix", 0)
1108            test_params = {'test_time': time.time(),
1109                           'test_name': self.id(),
1110                           'json': 0}
1111            sc = self.start_stats(self.spec_reference + ".warmup",
1112                                  test_params=test_params,
1113                                  client_id=client_id)
1114
1115        self.log.info("preparing to warmup cluster ...")
1116
1117        server = self.input.servers[0]
1118        shell = RemoteMachineShellConnection(server)
1119
1120        start_time = time.time()
1121
1122        self.log.info("stopping couchbase ... ({0})".format(server.ip))
1123        shell.stop_couchbase()
1124        self.log.info("couchbase stopped ({0})".format(server.ip))
1125
1126        if flush_os_cache:
1127            self.log.info("flushing os cache ...")
1128            shell.flush_os_caches()
1129
1130        shell.start_couchbase()
1131        self.log.info("couchbase restarted ({0})".format(server.ip))
1132
1133        self.wait_until_warmed_up()
1134        self.log.info("warmup finished")
1135
1136        end_time = time.time()
1137        ops = {'tot-sets': 0,
1138               'tot-gets': 0,
1139               'tot-items': 0,
1140               'tot-creates': 0,
1141               'tot-misses': 0,
1142               "start-time": start_time,
1143               "end-time": end_time}
1144
1145        if collect_stats:
1146            self.end_stats(sc, ops, self.spec_reference + ".warmup")
1147
1148    def wait_until_warmed_up(self, master=None):
1149        if not master:
1150            master = self.input.servers[0]
1151
1152        bucket = self.param("bucket", "default")
1153
1154        fn = RebalanceHelper.wait_for_mc_stats_no_timeout
1155        for bucket in self.buckets:
1156            RebalanceHelper.wait_for_stats_on_all(master, bucket,
1157                                                  'ep_warmup_thread',
1158                                                  'complete', fn=fn)
1159    def set_param(self, name, val):
1160
1161        input = getattr(self, "input", TestInputSingleton.input)
1162        input.test_params[name] = str(val)
1163
1164        return True
1165
1166    def wait_for_task_completion(self, task='indexer'):
1167        """Wait for ns_server task to finish"""
1168        t0 = time.time()
1169        self.log.info("Waiting 30 seconds before {0} monitoring".format(task))
1170        time.sleep(30)
1171
1172        while True:
1173            tasks = self.rest.ns_server_tasks()
1174            if tasks:
1175                try:
1176                    progress = [t['progress'] for t in tasks if t['type'] == task]
1177                except TypeError:
1178                    self.log.error(tasks)
1179                else:
1180                    if progress:
1181                        self.log.info("{0} progress: {1}".format(task, progress))
1182                        time.sleep(10)
1183                    else:
1184                        break
1185
1186        t1 = time.time()
1187        self.log.info("Time taken to perform task: {0} sec".format(t1 - t0))
1188
1189    def param(self, name, default_value):
1190        input = getattr(self, "input", TestInputSingleton.input)
1191        return input.test_params.get(name, default_value)
1192
1193    def parami(self, name, default_int):
1194        return int(self.param(name, default_int))
1195
1196    def paramf(self, name, default_float):
1197        return float(self.param(name, default_float))
1198
1199    def params(self, name, default_str):
1200        return str(self.param(name, default_str))
1201