1d3c8f8afSfarshidimport json
268083873Skkumarimport re
3d3c8f8afSfarshidfrom threading import Thread
4d3c8f8afSfarshidimport time
56bef6d10Skkumarimport gzip
6fbfbed5aSPavel.Paulaufrom collections import defaultdict
77d4a8f10SPavel.Paulauimport logging
87d4a8f10SPavel.Paulauimport logging.config
9cc53ff73Spavel-paulaufrom uuid import uuid4
10ec399ee9Spavel-paulaufrom itertools import cycle
11d3c8f8afSfarshid
121d8a4755SPavel.Paulaufrom lib.membase.api.exception import SetViewInfoNotFound, ServerUnavailableException
133cd17a2eSPavel.Paulaufrom lib.membase.api.rest_client import RestConnection
14cc53ff73Spavel-paulaufrom lib.memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached
153cd17a2eSPavel.Paulaufrom lib.remote.remote_util import RemoteMachineShellConnection, RemoteMachineHelper
163cd17a2eSPavel.Paulau
179eb741e4SPavel.Paulau
185b65c083SronniedadaRETRIES = 10
196bb01fc9SPavel.Paulau
207d4a8f10SPavel.Paulaulogging.config.fileConfig('mcsoda.logging.conf')
21933747bbSPavel.Paulaulogging.getLogger("paramiko").setLevel(logging.WARNING)
227d4a8f10SPavel.Paulaulog = logging.getLogger()
237d4a8f10SPavel.Paulau
24cc53ff73Spavel-paulauhex = lambda: uuid4().hex
25cc53ff73Spavel-paulau
266bb01fc9SPavel.Paulau
27210f72cfSKaran Kumardef histo_percentile(histo, percentiles):
286bb01fc9SPavel.Paulau    """The histo dict is returned by add_timing_sample(). The percentiles must
296bb01fc9SPavel.Paulau    be sorted, ascending, like [0.90, 0.99]."""
306bb01fc9SPavel.Paulau    v_sum = 0
316bb01fc9SPavel.Paulau    bins = histo.keys()
326bb01fc9SPavel.Paulau    bins.sort()
336bb01fc9SPavel.Paulau    for bin in bins:
346bb01fc9SPavel.Paulau        v_sum += histo[bin]
356bb01fc9SPavel.Paulau    v_sum = float(v_sum)
3690c3607aSPavel.Paulau    v_cur = 0  # Running total.
376bb01fc9SPavel.Paulau    rv = []
386bb01fc9SPavel.Paulau    for bin in bins:
396bb01fc9SPavel.Paulau        if not percentiles:
406bb01fc9SPavel.Paulau            return rv
41c253ff96SPavel.Paulau        v_cur += histo[bin]
42c253ff96SPavel.Paulau        while percentiles and (v_cur / v_sum) >= percentiles[0]:
43c253ff96SPavel.Paulau            rv.append((percentiles[0], bin))
44c253ff96SPavel.Paulau            percentiles.pop(0)
456bb01fc9SPavel.Paulau    return rv
466bb01fc9SPavel.Paulau
47210f72cfSKaran Kumar
48d3c8f8afSfarshidclass StatsCollector(object):
498a9726c7Sfarshid    _task = {}
50d3c8f8afSfarshid    _verbosity = True
512deb21aaSPavel.Paulau    _mb_stats = {"snapshots": []}  # manually captured memcached stats
527798c19eSronniedada    _reb_stats = {}
535dc2e655Sronniedada    _lat_avg_stats = {}     # aggregated top level latency stats
54537654a8Spavel-paulau    _xdcr_stats = {}
55d3c8f8afSfarshid
56d3c8f8afSfarshid    def __init__(self, verbosity):
57d3c8f8afSfarshid        self._verbosity = verbosity
58242ed11bSSteve Yen        self.is_leader = False
594f888dc6SPavel.Paulau        self.active_mergers = 0
60d3c8f8afSfarshid
61e34ff75aSPavel.Paulau    def start(self, nodes, bucket, pnames, name, client_id='',
62cc53ff73Spavel-paulau              collect_server_stats=True, ddoc=None, clusters=None):
632deb21aaSPavel.Paulau        """This function starts collecting stats from all nodes with the given
64e34ff75aSPavel.Paulau        interval"""
652deb21aaSPavel.Paulau        self._task = {"state": "running", "threads": [], "name": name,
662deb21aaSPavel.Paulau                      "time": time.time(), "ops": [], "totalops": [],
672deb21aaSPavel.Paulau                      "ops-temp": [], "latency": {}, "data_size_stats": []}
685dd73acbSkkumar        rest = RestConnection(nodes[0])
695dd73acbSkkumar        info = rest.get_nodes_self()
705dd73acbSkkumar        self.data_path = info.storage[0].get_data_path()
71776378f4Skkumar        self.client_id = str(client_id)
7288c90b31SPavel.Paulau        self.nodes = nodes
7316d64f79SPavel.Paulau        self.bucket = bucket
74b1c03086SSteve Yen
75b1c03086SSteve Yen        if collect_server_stats:
76272f2114SPavel.Paulau            self._task["threads"].append(
77272f2114SPavel.Paulau                Thread(target=self.membase_stats, name="membase")
78272f2114SPavel.Paulau            )
79272f2114SPavel.Paulau            self._task["threads"].append(
80272f2114SPavel.Paulau                Thread(target=self.system_stats, name="system", args=(pnames, ))
81272f2114SPavel.Paulau            )
82272f2114SPavel.Paulau            self._task["threads"].append(
83272f2114SPavel.Paulau                Thread(target=self.iostats, name="iostats")
84272f2114SPavel.Paulau            )
85272f2114SPavel.Paulau            self._task["threads"].append(
86272f2114SPavel.Paulau                Thread(target=self.ns_server_stats, name="ns_server")
87272f2114SPavel.Paulau            )
88272f2114SPavel.Paulau            self._task["threads"].append(
89272f2114SPavel.Paulau                Thread(target=self.get_bucket_size, name="bucket_size")
90272f2114SPavel.Paulau            )
91ec399ee9Spavel-paulau            self._task["threads"].append(
923d8c63d0Spavel-paulau                Thread(target=self.rebalance_progress, name="rebalance_progress")
93ec399ee9Spavel-paulau            )
945476587bSPavel.Paulau            if ddoc is not None:
95272f2114SPavel.Paulau                self._task["threads"].append(
96272f2114SPavel.Paulau                    Thread(target=self.indexing_time_stats, name="index_time", args=(ddoc, ))
97272f2114SPavel.Paulau                )
98272f2114SPavel.Paulau                self._task["threads"].append(
99272f2114SPavel.Paulau                    Thread(target=self.indexing_throughput_stats, name="index_thr")
100272f2114SPavel.Paulau                )
101cc53ff73Spavel-paulau            if clusters:
102cc53ff73Spavel-paulau                self.clusters = clusters
103cc53ff73Spavel-paulau                self._task["threads"].append(
104cc53ff73Spavel-paulau                    Thread(target=self.xdcr_lag_stats, name="xdcr_lag_stats")
105cc53ff73Spavel-paulau                )
1065476587bSPavel.Paulau
107ca4cb325SPavel.Paulau            for thread in self._task["threads"]:
108ca4cb325SPavel.Paulau                thread.daemon = True
109ca4cb325SPavel.Paulau                thread.start()
110ca4cb325SPavel.Paulau
111b1c03086SSteve Yen            # Getting build/machine stats from only one node in the cluster
112b1c03086SSteve Yen            self.build_stats(nodes)
113b1c03086SSteve Yen            self.machine_stats(nodes)
114d3c8f8afSfarshid
11588c90b31SPavel.Paulau            # Start atop
11688c90b31SPavel.Paulau            self.start_atop()
11788c90b31SPavel.Paulau
1188a9726c7Sfarshid    def stop(self):
11988c90b31SPavel.Paulau        self.stop_atop()
1208a9726c7Sfarshid        self._task["state"] = "stopped"
1218a9726c7Sfarshid        for t in self._task["threads"]:
122316b5bdeSronniedada            t.join(120)
123316b5bdeSronniedada            if t.is_alive():
1240d4a7de4SPavel.Paulau                log.error("failed to join {0} thread".format(t.name))
125d3c8f8afSfarshid
1268a9726c7Sfarshid        self._task["time"] = time.time() - self._task["time"]
127d3c8f8afSfarshid
12860d4fd06SSteve Yen    def sample(self, cur):
12960d4fd06SSteve Yen        pass
13060d4fd06SSteve Yen
131537654a8Spavel-paulau    def get_ns_servers_samples(self, metric):
132537654a8Spavel-paulau        for subset in self._task["ns_server_data"]:
133537654a8Spavel-paulau            samples = subset["op"]["samples"][metric]
134537654a8Spavel-paulau            yield float(sum(samples)) / len(samples)
135537654a8Spavel-paulau
136537654a8Spavel-paulau    def calc_xperf_stats(self):
137537654a8Spavel-paulau        metrics = ("replication_changes_left", "xdc_ops")
138537654a8Spavel-paulau        for metric in metrics:
139537654a8Spavel-paulau            self._xdcr_stats["avg_" + metric] = \
140537654a8Spavel-paulau                sum(self.get_ns_servers_samples(metric)) /\
141537654a8Spavel-paulau                sum(1 for _ in self.get_ns_servers_samples(metric))
142537654a8Spavel-paulau
14351b7d937SKaran Kumar    def export(self, name, test_params):
14475bd9044SSteve Yen        for latency in self._task["latency"].keys():
145a3e12e23Sronniedada            # save the last histogram snapshot
1465dc2e655Sronniedada            per_90th_tot = 0
147210f72cfSKaran Kumar            histos = self._task["latency"].get(latency, [])
148a3e12e23Sronniedada            if histos:
149a3e12e23Sronniedada                key = latency + "-histogram"
150a3e12e23Sronniedada                self._task["latency"][key] = histos[-1].copy()
151a3e12e23Sronniedada                del self._task["latency"][key]["delta"]
152a3e12e23Sronniedada                self._task["latency"][key]["client_id"] = self.client_id
153a3e12e23Sronniedada
154a3e12e23Sronniedada            # calculate percentiles
155210f72cfSKaran Kumar            key = 'percentile-' + latency
156210f72cfSKaran Kumar            self._task["latency"][key] = []
157210f72cfSKaran Kumar            for histo in histos:
158a3e12e23Sronniedada                # for every sample histogram, produce a temp summary:
159a3e12e23Sronniedada                # temp = [90 per, 95 per, 99 per, client_id, delta]
16038b09640Skkumar                temp = []
16159b0778bSronniedada                time = histo['time']
162d16ebbbaSkkumar                delta = histo['delta']
16359b0778bSronniedada                del histo['delta'], histo['time']
164922ec29fSPavel.Paulau                p = histo_percentile(histo, [0.80, 0.90, 0.95, 0.99, 0.999])
16538b09640Skkumar                # p is list of tuples
16638b09640Skkumar                for val in p:
16738b09640Skkumar                    temp.append(val[-1])
1685dc2e655Sronniedada                per_90th_tot += temp[1]
16938b09640Skkumar                temp.append(self.client_id)
17059b0778bSronniedada                temp.append(time)
17138b09640Skkumar                temp.append(delta)
17238b09640Skkumar                self._task["latency"][key].append(temp)
1735dc2e655Sronniedada            if per_90th_tot:
1745dc2e655Sronniedada                self._lat_avg_stats["%s-90th-avg" % latency] \
1755dc2e655Sronniedada                    = per_90th_tot / len(histos) * 1000000
176210f72cfSKaran Kumar
177537654a8Spavel-paulau        # XDCR stats
178537654a8Spavel-paulau        try:
179537654a8Spavel-paulau            self.calc_xperf_stats()
180537654a8Spavel-paulau        except KeyError:
181537654a8Spavel-paulau            pass
182537654a8Spavel-paulau
183537654a8Spavel-paulau        test_params.update(self._xdcr_stats)
1847798c19eSronniedada        test_params.update(self._reb_stats)
1855dc2e655Sronniedada        test_params.update(self._lat_avg_stats)
1867798c19eSronniedada
1870e87c8d5SPavel.Paulau        obj = {
1880e87c8d5SPavel.Paulau            "buildinfo": self._task.get("buildstats", {}),
1890e87c8d5SPavel.Paulau            "machineinfo": self._task.get("machinestats", {}),
1900e87c8d5SPavel.Paulau            "membasestats": self._task.get("membasestats", []),
1910e87c8d5SPavel.Paulau            "systemstats": self._task.get("systemstats", []),
1920e87c8d5SPavel.Paulau            "iostats": self._task.get("iostats", []),
1930e87c8d5SPavel.Paulau            "name": name,
1940e87c8d5SPavel.Paulau            "totalops": self._task["totalops"],
1950e87c8d5SPavel.Paulau            "ops": self._task["ops"],
1960e87c8d5SPavel.Paulau            "time": self._task["time"],
1970e87c8d5SPavel.Paulau            "info": test_params,
1980e87c8d5SPavel.Paulau            "ns_server_data": self._task.get("ns_server_stats", []),
1990e87c8d5SPavel.Paulau            "ns_server_data_system": self._task.get("ns_server_stats_system", []),
2000e87c8d5SPavel.Paulau            "view_info": self._task.get("view_info", []),
2010e87c8d5SPavel.Paulau            "indexer_info": self._task.get("indexer_info", []),
202cc53ff73Spavel-paulau            "xdcr_lag": self._task.get("xdcr_lag", []),
203ec399ee9Spavel-paulau            "rebalance_progress": self._task.get("rebalance_progress", []),
2040e87c8d5SPavel.Paulau            "timings": self._task.get("timings", []),
2050e87c8d5SPavel.Paulau            "dispatcher": self._task.get("dispatcher", []),
2060e87c8d5SPavel.Paulau            "bucket-size": self._task.get("bucket_size", []),
2070e87c8d5SPavel.Paulau            "data-size": self._task.get("data_size_stats", []),
2080e87c8d5SPavel.Paulau            "latency-set-histogram": self._task["latency"].get("latency-set-histogram", []),
2090e87c8d5SPavel.Paulau            "latency-set": self._task["latency"].get('percentile-latency-set', []),
2100e87c8d5SPavel.Paulau            "latency-set-recent": self._task["latency"].get('percentile-latency-set-recent', []),
2110e87c8d5SPavel.Paulau            "latency-get-histogram": self._task["latency"].get("latency-get-histogram", []),
2120e87c8d5SPavel.Paulau            "latency-get": self._task["latency"].get('percentile-latency-get', []),
2130e87c8d5SPavel.Paulau            "latency-get-recent": self._task["latency"].get('percentile-latency-get-recent', []),
2140e87c8d5SPavel.Paulau            "latency-delete": self._task["latency"].get('percentile-latency-delete', []),
2150e87c8d5SPavel.Paulau            "latency-delete-recent": self._task["latency"].get('percentile-latency-delete-recent', []),
2160e87c8d5SPavel.Paulau            "latency-query-histogram": self._task["latency"].get("latency-query-histogram", []),
2170e87c8d5SPavel.Paulau            "latency-query": self._task["latency"].get('percentile-latency-query', []),
2180e87c8d5SPavel.Paulau            "latency-query-recent": self._task["latency"].get('percentile-latency-query-recent', []),
2190e87c8d5SPavel.Paulau            "latency-obs-persist-server-histogram": self._task["latency"].get("latency-obs-persist-server-histogram", []),
2200e87c8d5SPavel.Paulau            "latency-obs-persist-server": self._task["latency"].get('percentile-latency-obs-persist-server-server', []),
2210e87c8d5SPavel.Paulau            "latency-obs-persist-server-recent": self._task["latency"].get('percentile-latency-obs-persist-server-recent', []),
2220e87c8d5SPavel.Paulau            "latency-obs-persist-client-histogram": self._task["latency"].get("latency-obs-persist-client-histogram", []),
2230e87c8d5SPavel.Paulau            "latency-obs-persist-client": self._task["latency"].get('percentile-latency-obs-persist-client', []),
2240e87c8d5SPavel.Paulau            "latency-obs-persist-client-recent": self._task["latency"].get('percentile-latency-obs-persist-client-recent', []),
2250e87c8d5SPavel.Paulau            "latency-obs-repl-client-histogram": self._task["latency"].get("latency-obs-repl-client-histogram", []),
2260e87c8d5SPavel.Paulau            "latency-obs-repl-client": self._task["latency"].get('percentile-latency-obs-repl-client', []),
2270e87c8d5SPavel.Paulau            "latency-obs-repl-client-recent": self._task["latency"].get('percentile-latency-obs-repl-client-recent', []),
2280e87c8d5SPavel.Paulau            "latency-woq-obs-histogram": self._task["latency"].get("latency-woq-obs-histogram", []),
2290e87c8d5SPavel.Paulau            "latency-woq-obs": self._task["latency"].get('percentile-latency-woq-obs', []),
2300e87c8d5SPavel.Paulau            "latency-woq-obs-recent": self._task["latency"].get('percentile-latency-woq-obs-recent', []),
2310e87c8d5SPavel.Paulau            "latency-woq-query-histogram": self._task["latency"].get("latency-woq-query-histogram", []),
2320e87c8d5SPavel.Paulau            "latency-woq-query": self._task["latency"].get('percentile-latency-woq-query', []),
2330e87c8d5SPavel.Paulau            "latency-woq-query-recent": self._task["latency"].get('percentile-latency-woq-query-recent', []),
2340e87c8d5SPavel.Paulau            "latency-woq-histogram": self._task["latency"].get("latency-woq-histogram", []),
2350e87c8d5SPavel.Paulau            "latency-woq": self._task["latency"].get('percentile-latency-woq', []),
2360e87c8d5SPavel.Paulau            "latency-woq-recent": self._task["latency"].get('percentile-latency-woq-recent', []),
2370e87c8d5SPavel.Paulau            "latency-cor-histogram": self._task["latency"].get("latency-cor-histogram", []),
2380e87c8d5SPavel.Paulau            "latency-cor": self._task["latency"].get('percentile-latency-cor', []),
2390e87c8d5SPavel.Paulau            "latency-cor-recent": self._task["latency"].get('percentile-latency-cor-recent', [])}
240ad14a298SKaran Kumar
241ad14a298SKaran Kumar        if self.client_id:
2420e87c8d5SPavel.Paulau            patterns = ('reload$', 'load$', 'warmup$', 'index$')
2430e87c8d5SPavel.Paulau            phases = ('.reload', '.load', '.warmup', '.index')
2445f004269SPavel.Paulau            name_picker = lambda (pattern, phase): re.search(pattern, self._task["name"])
2455f004269SPavel.Paulau            try:
2465f004269SPavel.Paulau                phase = filter(name_picker, zip(patterns, phases))[0][1]
2475f004269SPavel.Paulau            except IndexError:
2485f004269SPavel.Paulau                phase = '.loop'
2490e87c8d5SPavel.Paulau            name = str(self.client_id) + phase
2505f004269SPavel.Paulau
251a9db2583SPavel.Paulau        file = gzip.open("{0}.json.gz".format(name), 'wb')
252a9db2583SPavel.Paulau        file.write(json.dumps(obj))
253a9db2583SPavel.Paulau        file.close()
254d3c8f8afSfarshid
25516d64f79SPavel.Paulau    def get_bucket_size(self, interval=60):
25614d74ac2SKaran Kumar        self._task["bucket_size"] = []
257dca19597Sronniedada        retries = 0
25816d64f79SPavel.Paulau        nodes_iterator = (node for node in self.nodes)
259dca19597Sronniedada        node = nodes_iterator.next()
260dca19597Sronniedada        rest = RestConnection(node)
26114d74ac2SKaran Kumar        while not self._aborted():
262e34ff75aSPavel.Paulau            time.sleep(interval)
2630d4a7de4SPavel.Paulau            log.info("collecting bucket size stats")
264dca19597Sronniedada            try:
26516d64f79SPavel.Paulau                status, db_size = rest.get_database_disk_size(self.bucket)
266dca19597Sronniedada                if status:
267dca19597Sronniedada                    self._task["bucket_size"].append(db_size)
268dca19597Sronniedada            except IndexError, e:
269dca19597Sronniedada                retries += 1
27016d64f79SPavel.Paulau                log.error("unable to get bucket size {0}: {1}"
27116d64f79SPavel.Paulau                          .format(self.bucket, e))
2720d4a7de4SPavel.Paulau                log.warning("retries: {0} of {1}".format(retries, RETRIES))
273dca19597Sronniedada                if retries == RETRIES:
274dca19597Sronniedada                    try:
275dca19597Sronniedada                        node = nodes_iterator.next()
276dca19597Sronniedada                        rest = RestConnection(node)
277dca19597Sronniedada                        retries = 0
278dca19597Sronniedada                    except StopIteration:
2790d4a7de4SPavel.Paulau                        log.error("no nodes available: stop collecting bucket_size")
280dca19597Sronniedada                        return
28114d74ac2SKaran Kumar
2820d4a7de4SPavel.Paulau        log.info("finished bucket size stats")
283d3c8f8afSfarshid
284e34ff75aSPavel.Paulau    def get_data_file_size(self, nodes, interval, bucket):
2857471ca4aSKaran Kumar        shells = []
2867471ca4aSKaran Kumar        for node in nodes:
2877471ca4aSKaran Kumar            try:
2887471ca4aSKaran Kumar                shells.append(RemoteMachineShellConnection(node))
289e87da15eSPavel.Paulau            except Exception as error:
290e87da15eSPavel.Paulau                log.error(error)
2917471ca4aSKaran Kumar        paths = []
29248c105c0SAndrei Baranouski        if shells[0].is_couchbase_installed():
29348c105c0SAndrei Baranouski            bucket_path = self.data_path + '/{0}'.format(bucket)
2947471ca4aSKaran Kumar            paths.append(bucket_path)
29548c105c0SAndrei Baranouski            view_path = bucket_path + '/set_view_{0}_design'.format(bucket)
2967471ca4aSKaran Kumar            paths.append(view_path)
29748c105c0SAndrei Baranouski        else:
29848c105c0SAndrei Baranouski            paths.append(self.data_path + '/{0}-data'.format(bucket))
29948c105c0SAndrei Baranouski
3007471ca4aSKaran Kumar        d = {"snapshots": []}
3017471ca4aSKaran Kumar        start_time = str(self._task["time"])
3027471ca4aSKaran Kumar
3037471ca4aSKaran Kumar        while not self._aborted():
304e34ff75aSPavel.Paulau            time.sleep(interval)
3057471ca4aSKaran Kumar            current_time = time.time()
3067471ca4aSKaran Kumar            i = 0
3077471ca4aSKaran Kumar            for shell in shells:
3087471ca4aSKaran Kumar                node = nodes[i]
30948c105c0SAndrei Baranouski                unique_id = node.ip + '-' + start_time
3107471ca4aSKaran Kumar                value = {}
3117471ca4aSKaran Kumar                for path in paths:
3127471ca4aSKaran Kumar                    size = shell.get_data_file_size(path)
3137471ca4aSKaran Kumar                    value["file"] = path.split('/')[-1]
3147471ca4aSKaran Kumar                    value["size"] = size
3157471ca4aSKaran Kumar                    value["unique_id"] = unique_id
3167471ca4aSKaran Kumar                    value["time"] = current_time
3177471ca4aSKaran Kumar                    value["ip"] = node.ip
3185dd73acbSkkumar                    d["snapshots"].append(value.copy())
31948c105c0SAndrei Baranouski                i += 1
3207471ca4aSKaran Kumar        self._task["data_size_stats"] = d["snapshots"]
3210d4a7de4SPavel.Paulau        log.info("finished data_size_stats")
3227471ca4aSKaran Kumar
323d3c8f8afSfarshid    #ops stats
324f1115277SSteve Yen    #{'tot-sets': 899999, 'tot-gets': 1, 'tot-items': 899999, 'tot-creates': 899999}
3258a9726c7Sfarshid    def ops_stats(self, ops_stat):
3268a9726c7Sfarshid        ops_stat["time"] = time.time()
3278a9726c7Sfarshid        self._task["ops-temp"].append(ops_stat)
3283c37430fSPavel.Paulau        if len(self._task["ops-temp"]) >= 500 * (1 + self.active_mergers):
329982ea73cSPavel.Paulau            # Prevent concurrent merge
3304f888dc6SPavel.Paulau            while self.active_mergers:
331f3cbc498SPavel.Paulau                time.sleep(0.1)
332982ea73cSPavel.Paulau
3334f888dc6SPavel.Paulau            # Semaphore: +1 active
3344f888dc6SPavel.Paulau            self.active_mergers += 1
335982ea73cSPavel.Paulau
336982ea73cSPavel.Paulau            # Merge
3378a9726c7Sfarshid            merged = self._merge()
3388a9726c7Sfarshid            self._task["ops"].append(merged)
339245b7798SPavel.Paulau            self._task["ops-temp"] = self._task["ops-temp"][500:]
3408a9726c7Sfarshid
3414f888dc6SPavel.Paulau            # Semaphore: -1 active
3424f888dc6SPavel.Paulau            self.active_mergers -= 1
343982ea73cSPavel.Paulau
3448a9726c7Sfarshid        #if self._task["ops"] has more than 1000 elements try to aggregate them ?
3458a9726c7Sfarshid
346241237ccSronniedada    def latency_stats(self, latency_cmd, latency_stat, cur_time=0):
347557309d0SKaran Kumar        if self._task["latency"].get(latency_cmd) is None:
348557309d0SKaran Kumar            self._task["latency"][latency_cmd] = []
349d16ebbbaSkkumar        temp_latency_stat = latency_stat.copy()
350241237ccSronniedada        if not cur_time:
351241237ccSronniedada            cur_time = time.time()
35259b0778bSronniedada        temp_latency_stat['time'] = int(cur_time)
35359b0778bSronniedada        temp_latency_stat['delta'] = cur_time - self._task['time']
354d16ebbbaSkkumar        self._task["latency"][latency_cmd].append(temp_latency_stat)
355f1115277SSteve Yen
3568a9726c7Sfarshid    def _merge(self):
3578a9726c7Sfarshid        first = self._task["ops-temp"][0]
35890c3607aSPavel.Paulau        merged = {"startTime": first["start-time"]}
3598a9726c7Sfarshid        totalgets = 0
3608a9726c7Sfarshid        totalsets = 0
361de431f23SPavel.Paulau        totalqueries = 0
3628a9726c7Sfarshid        delta = 0
363245b7798SPavel.Paulau        for i in range(499):
364c905cf28SKaran Kumar            current = self._task["ops-temp"][i]
36548c105c0SAndrei Baranouski            next = self._task["ops-temp"][i + 1]
366c905cf28SKaran Kumar            totalgets += current["tot-gets"]
367c905cf28SKaran Kumar            totalsets += current["tot-sets"]
368de431f23SPavel.Paulau            totalqueries += current["tot-queries"]
369c905cf28SKaran Kumar            delta += (next["start-time"] - current["start-time"])
3701f02afc0SKaran Kumar        merged["endTime"] = merged["startTime"] + delta
3711f02afc0SKaran Kumar        merged["totalSets"] = totalsets
3721f02afc0SKaran Kumar        merged["totalGets"] = totalgets
373de431f23SPavel.Paulau        merged["totalQueries"] = totalqueries
374de431f23SPavel.Paulau        qps = totalqueries / float(delta)
375de431f23SPavel.Paulau        merged["queriesPerSec"] = qps
3768a9726c7Sfarshid        return merged
3778a9726c7Sfarshid
3788a9726c7Sfarshid    def total_stats(self, ops_stat):
379d3c8f8afSfarshid        ops_stat["time"] = time.time()
380dfab8bacSfarshid        self._task["totalops"].append(ops_stat)
381d3c8f8afSfarshid
38248c105c0SAndrei Baranouski    def build_stats(self, nodes):
383d3c8f8afSfarshid        json_response = StatUtil.build_info(nodes[0])
3848a9726c7Sfarshid        self._task["buildstats"] = json_response
385d3c8f8afSfarshid
38648c105c0SAndrei Baranouski    def machine_stats(self, nodes):
387601facbcSKaran Kumar        machine_stats = StatUtil.machine_info(nodes[0])
3888a9726c7Sfarshid        self._task["machinestats"] = machine_stats
389d3c8f8afSfarshid
390035fac5bSronniedada    def reb_stats(self, start, dur):
3910d4a7de4SPavel.Paulau        log.info("recording reb start = {0}, reb duration = {1}".format(start, dur))
392035fac5bSronniedada        self._reb_stats["reb_start"] = start
3937798c19eSronniedada        self._reb_stats["reb_dur"] = dur
3947798c19eSronniedada
395d3c8f8afSfarshid    def _extract_proc_info(self, shell, pid):
3962b496d28SPavel.Paulau        output, error = shell.execute_command("cat /proc/{0}/stat".format(pid))
3972b496d28SPavel.Paulau        fields = (
3982b496d28SPavel.Paulau            'pid', 'comm', 'state', 'ppid', 'pgrp', 'session', 'tty_nr',
3992b496d28SPavel.Paulau            'tpgid', 'flags', 'minflt', 'cminflt', 'majflt', 'cmajflt',
4002b496d28SPavel.Paulau            'utime', 'stime', 'cutime', 'cstime', 'priority ' 'nice',
4012b496d28SPavel.Paulau            'num_threads', 'itrealvalue', 'starttime', 'vsize', 'rss',
4022b496d28SPavel.Paulau            'rsslim', 'startcode', 'endcode', 'startstack', 'kstkesp',
4032b496d28SPavel.Paulau            'kstkeip', 'signal', 'blocked ', 'sigignore', 'sigcatch', 'wchan',
4042b496d28SPavel.Paulau            'nswap', 'cnswap', 'exit_signal', 'processor', 'rt_priority',
4052b496d28SPavel.Paulau            'policy', 'delayacct_blkio_ticks', 'guest_time', 'cguest_time')
4062b496d28SPavel.Paulau
4072b496d28SPavel.Paulau        return {} if error else dict(zip(fields, output[0].split(' ')))
408d3c8f8afSfarshid
409ac3005adSronniedada    def _extract_io_info(self, shell):
410ac3005adSronniedada        """
411ec733748Sronniedada        Extract info from iostat
412ec733748Sronniedada
413ec733748Sronniedada        Output:
414ec733748Sronniedada
415ec733748Sronniedada        [kB_read, kB_wrtn, %util, %iowait, %idle]
416ec733748Sronniedada
417ec733748Sronniedada        Rough Benchmarks:
418ec733748Sronniedada        My local box (WIFI LAN - VM), took ~1.2 sec for this routine
419ac3005adSronniedada        """
420ac3005adSronniedada        CMD = "iostat -dk | grep 'sd. ' | " \
421ac3005adSronniedada              "awk '{read+=$5; write+=$6} END { print read, write }'"
422ec733748Sronniedada        out, err = shell.execute_command(CMD)
423ec733748Sronniedada        results = out[0]
424ec733748Sronniedada
425ec733748Sronniedada        CMD = "iostat -dkx | grep 'sd. ' | "\
426ec733748Sronniedada              "awk '{util+=$12} END { print util/NR }'"
427ec733748Sronniedada        out, err = shell.execute_command(CMD)
428ec733748Sronniedada        results = "%s %s" % (results, out[0])
429ac3005adSronniedada
430ec733748Sronniedada        CMD = "iostat 1 2 -c | awk 'NR == 7 { print $4, $6 }'"
431ac3005adSronniedada        out, err = shell.execute_command(CMD)
432ec733748Sronniedada        results = "%s %s" % (results, out[0])
433ac3005adSronniedada
434ec733748Sronniedada        return results.split(' ')
435ac3005adSronniedada
43616d64f79SPavel.Paulau    def system_stats(self, pnames, interval=10):
4378a9726c7Sfarshid        shells = []
43816d64f79SPavel.Paulau        for node in self.nodes:
4398a9726c7Sfarshid            try:
4408a9726c7Sfarshid                shells.append(RemoteMachineShellConnection(node))
441f86dc70fSronniedada            except Exception, error:
442e87da15eSPavel.Paulau                log.error(error)
443d3c8f8afSfarshid        d = {"snapshots": []}
444d3c8f8afSfarshid        #        "pname":"x","pid":"y","snapshots":[{"time":time,"value":value}]
44552332309SKaran Kumar
44680db7247SKaran Kumar        start_time = str(self._task["time"])
4478a9726c7Sfarshid        while not self._aborted():
448e34ff75aSPavel.Paulau            time.sleep(interval)
449ab2e51dfSKaran Kumar            current_time = time.time()
45052332309SKaran Kumar            i = 0
451d3c8f8afSfarshid            for shell in shells:
45216d64f79SPavel.Paulau                node = self.nodes[i]
45348c105c0SAndrei Baranouski                unique_id = node.ip + '-' + start_time
454d3c8f8afSfarshid                for pname in pnames:
455d3c8f8afSfarshid                    obj = RemoteMachineHelper(shell).is_process_running(pname)
456d3c8f8afSfarshid                    if obj and obj.pid:
457d3c8f8afSfarshid                        value = self._extract_proc_info(shell, obj.pid)
458d3c8f8afSfarshid                        value["name"] = pname
459d3c8f8afSfarshid                        value["id"] = obj.pid
46052332309SKaran Kumar                        value["unique_id"] = unique_id
461ab2e51dfSKaran Kumar                        value["time"] = current_time
46252332309SKaran Kumar                        value["ip"] = node.ip
463d3c8f8afSfarshid                        d["snapshots"].append(value)
46448c105c0SAndrei Baranouski                i += 1
4658a9726c7Sfarshid        self._task["systemstats"] = d["snapshots"]
4660d4a7de4SPavel.Paulau        log.info("finished system_stats")
467d3c8f8afSfarshid
46816d64f79SPavel.Paulau    def iostats(self, interval=10):
469ac3005adSronniedada        shells = []
47016d64f79SPavel.Paulau        for node in self.nodes:
471ac3005adSronniedada            try:
472ac3005adSronniedada                shells.append(RemoteMachineShellConnection(node))
473f86dc70fSronniedada            except Exception, error:
474e87da15eSPavel.Paulau                log.error(error)
475ac3005adSronniedada
476ac3005adSronniedada        self._task["iostats"] = []
477ac3005adSronniedada
4780d4a7de4SPavel.Paulau        log.info("started capturing io stats")
479ac3005adSronniedada
480ac3005adSronniedada        while not self._aborted():
481e34ff75aSPavel.Paulau            time.sleep(interval)
4820d4a7de4SPavel.Paulau            log.info("collecting io stats")
483ac3005adSronniedada            for shell in shells:
484ec733748Sronniedada                try:
485ec733748Sronniedada                    kB_read, kB_wrtn, util, iowait, idle = \
486ec733748Sronniedada                        self._extract_io_info(shell)
48763ff2ccfSPavel.Paulau                except (ValueError, TypeError, IndexError):
488ec733748Sronniedada                    continue
489ac3005adSronniedada                if kB_read and kB_wrtn:
490ac3005adSronniedada                    self._task["iostats"].append({"time": time.time(),
491ac3005adSronniedada                                                 "ip": shell.ip,
492ac3005adSronniedada                                                 "read": kB_read,
493ec733748Sronniedada                                                 "write": kB_wrtn,
494ec733748Sronniedada                                                 "util": util,
495ec733748Sronniedada                                                 "iowait": iowait,
496ec733748Sronniedada                                                 "idle": idle})
4970d4a7de4SPavel.Paulau        log.info("finished capturing io stats")
498ac3005adSronniedada
49926ec735aSronniedada    def capture_mb_snapshot(self, node):
50071d35cd5SPavel.Paulau        """Capture membase stats snapshot manually"""
5010d4a7de4SPavel.Paulau        log.info("capturing memcache stats snapshot for {0}".format(node.ip))
50226ec735aSronniedada        stats = {}
50326ec735aSronniedada
50426ec735aSronniedada        try:
50526ec735aSronniedada            bucket = RestConnection(node).get_buckets()[0].name
50626ec735aSronniedada            mc = MemcachedClientHelper.direct_client(node, bucket)
50726ec735aSronniedada            stats = mc.stats()
508109d4591Sronniedada            stats.update(mc.stats("warmup"))
509f86dc70fSronniedada        except Exception, e:
5100d4a7de4SPavel.Paulau            log.error(e)
51126ec735aSronniedada            return False
512ca4d2d67Sronniedada        finally:
513ca4d2d67Sronniedada            stats["time"] = time.time()
514ca4d2d67Sronniedada            stats["ip"] = node.ip
515ca4d2d67Sronniedada            self._mb_stats["snapshots"].append(stats)
516ca4d2d67Sronniedada            print stats
51726ec735aSronniedada
5180d4a7de4SPavel.Paulau        log.info("memcache stats snapshot captured")
51926ec735aSronniedada        return True
52026ec735aSronniedada
52116d64f79SPavel.Paulau    def membase_stats(self, interval=60):
5228a9726c7Sfarshid        mcs = []
52316d64f79SPavel.Paulau        for node in self.nodes:
5248a9726c7Sfarshid            try:
5258a9726c7Sfarshid                bucket = RestConnection(node).get_buckets()[0].name
526f07c6108SPavel.Paulau                mc = MemcachedClientHelper.direct_client(node, bucket)
527f07c6108SPavel.Paulau                mcs.append(mc)
528f86dc70fSronniedada            except Exception, error:
529e87da15eSPavel.Paulau                log.error(error)
5308a9726c7Sfarshid        self._task["membasestats"] = []
5317f31ef8bSKaran Kumar        self._task["timings"] = []
5327f31ef8bSKaran Kumar        self._task["dispatcher"] = []
533f07c6108SPavel.Paulau        data = dict()
534d3c8f8afSfarshid        for mc in mcs:
535f07c6108SPavel.Paulau            data[mc.host] = {"snapshots": [], "timings": [], "dispatcher": []}
536d3c8f8afSfarshid
5378a9726c7Sfarshid        while not self._aborted():
538e34ff75aSPavel.Paulau            time.sleep(interval)
539f07c6108SPavel.Paulau            log.info("collecting membase stats")
540d3c8f8afSfarshid            for mc in mcs:
541f07c6108SPavel.Paulau                for rerty in xrange(RETRIES):
5425b65c083Sronniedada                    try:
5435b65c083Sronniedada                        stats = mc.stats()
5443264e7b3Sronniedada                    except Exception as e:
545f07c6108SPavel.Paulau                        log.warn("{0}, retries = {1}".format(str(e), rerty))
5465b65c083Sronniedada                        time.sleep(2)
5475b65c083Sronniedada                        mc.reconnect()
548f07c6108SPavel.Paulau                    else:
549f07c6108SPavel.Paulau                        break
550f07c6108SPavel.Paulau                else:
551f07c6108SPavel.Paulau                    stats = {}
552f07c6108SPavel.Paulau                data[mc.host]["snapshots"].append(stats)
553d3c8f8afSfarshid
554f07c6108SPavel.Paulau                for arg in ("timings", "dispatcher"):
555f07c6108SPavel.Paulau                    try:
556f07c6108SPavel.Paulau                        stats = mc.stats(arg)
557f07c6108SPavel.Paulau                        data[mc.host][arg].append(stats)
558f07c6108SPavel.Paulau                    except EOFError, e:
5590d4a7de4SPavel.Paulau                        log.error("unable to get {0} stats {1}: {2}"
560f07c6108SPavel.Paulau                                  .format(arg, mc.host, e))
561f07c6108SPavel.Paulau
562f07c6108SPavel.Paulau        for host in (mc.host for mc in mcs):
563f07c6108SPavel.Paulau            unique_id = host + '-' + str(self._task["time"])
564ab2e51dfSKaran Kumar            current_time = time.time()
565f07c6108SPavel.Paulau
566f07c6108SPavel.Paulau            if self._mb_stats["snapshots"]:  # use manually captured stats
56726ec735aSronniedada                self._task["membasestats"] = self._mb_stats["snapshots"]
568f07c6108SPavel.Paulau            else:  # use periodically captured stats
569f07c6108SPavel.Paulau                for snapshot in data[host]["snapshots"]:
570f07c6108SPavel.Paulau                    snapshot["unique_id"] = unique_id
571f07c6108SPavel.Paulau                    snapshot["time"] = current_time
572f07c6108SPavel.Paulau                    snapshot["ip"] = host
57326ec735aSronniedada                    self._task["membasestats"].append(snapshot)
574f07c6108SPavel.Paulau
575f07c6108SPavel.Paulau            for timing in data[host]["timings"]:
576f07c6108SPavel.Paulau                timing["unique_id"] = unique_id
577f07c6108SPavel.Paulau                timing["time"] = current_time
578f07c6108SPavel.Paulau                timing["ip"] = host
5797f31ef8bSKaran Kumar                self._task["timings"].append(timing)
580f07c6108SPavel.Paulau
581f07c6108SPavel.Paulau            for dispatcher in data[host]["dispatcher"]:
582f07c6108SPavel.Paulau                dispatcher["unique_id"] = unique_id
583f07c6108SPavel.Paulau                dispatcher["time"] = current_time
584f07c6108SPavel.Paulau                dispatcher["ip"] = host
5857f31ef8bSKaran Kumar                self._task["dispatcher"].append(dispatcher)
586d3c8f8afSfarshid
587f07c6108SPavel.Paulau            if data[host]["timings"]:
588f07c6108SPavel.Paulau                log.info("dumping disk timing stats: {0}".format(host))
589f07c6108SPavel.Paulau                latests_timings = data[host]["timings"][-1]
590f07c6108SPavel.Paulau                for key, value in sorted(latests_timings.iteritems()):
591f07c6108SPavel.Paulau                    if key.startswith("disk"):
592f07c6108SPavel.Paulau                        print "{0:50s}: {1}".format(key, value)
593937ec23aSPavel.Paulau
594f07c6108SPavel.Paulau        log.info("finished membase_stats")
595d3c8f8afSfarshid
59616d64f79SPavel.Paulau    def ns_server_stats(self, interval=60):
597d710b3b8SKaran Kumar        self._task["ns_server_stats"] = []
5982f86911dSkkumar        self._task["ns_server_stats_system"] = []
59916d64f79SPavel.Paulau        nodes_iterator = (node for node in self.nodes)
600dbc10854Sronniedada        node = nodes_iterator.next()
601dbc10854Sronniedada        retries = 0
60249bd6706SPavel.Paulau        not_null = lambda v: v if v is not None else 0
603d710b3b8SKaran Kumar
604dbc10854Sronniedada        rest = RestConnection(node)
605d710b3b8SKaran Kumar        while not self._aborted():
606e34ff75aSPavel.Paulau            time.sleep(interval)
6070d4a7de4SPavel.Paulau            log.info("collecting ns_server_stats")
608dbc10854Sronniedada            try:
609dbc10854Sronniedada                # Bucket stats
61016d64f79SPavel.Paulau                ns_server_stats = rest.fetch_bucket_stats(bucket=self.bucket)
611dbc10854Sronniedada                for key, value in ns_server_stats["op"]["samples"].iteritems():
612dbc10854Sronniedada                    ns_server_stats["op"]["samples"][key] = not_null(value)
613dbc10854Sronniedada                self._task["ns_server_stats"].append(ns_server_stats)
614dbc10854Sronniedada                # System stats
615dbc10854Sronniedada                ns_server_stats_system = rest.fetch_system_stats()
616dbc10854Sronniedada                self._task["ns_server_stats_system"].append(ns_server_stats_system)
61758f40fe3Spavel-paulau            except ServerUnavailableException, e:
61858f40fe3Spavel-paulau                log.error(e)
61971513a57SPavel.Paulau            except (ValueError, TypeError), e:
6200d4a7de4SPavel.Paulau                log.error("unable to parse json object {0}: {1}".format(node, e))
62158f40fe3Spavel-paulau            else:
62258f40fe3Spavel-paulau                continue
62358f40fe3Spavel-paulau            retries += 1
62458f40fe3Spavel-paulau            if retries <= RETRIES:
6250d4a7de4SPavel.Paulau                log.warning("retries: {0} of {1}".format(retries, RETRIES))
62658f40fe3Spavel-paulau            else:
62758f40fe3Spavel-paulau                try:
62858f40fe3Spavel-paulau                    node = nodes_iterator.next()
62958f40fe3Spavel-paulau                    rest = RestConnection(node)
63058f40fe3Spavel-paulau                    retries = 0
63158f40fe3Spavel-paulau                except StopIteration:
63258f40fe3Spavel-paulau                    log.error("no nodes available: stop collecting ns_server_stats")
63358f40fe3Spavel-paulau                    return
634d710b3b8SKaran Kumar
6350d4a7de4SPavel.Paulau        log.info("finished ns_server_stats")
636d710b3b8SKaran Kumar
637272f2114SPavel.Paulau    def indexing_time_stats(self, ddoc, interval=60):
6385476587bSPavel.Paulau        """Collect view indexing stats"""
6395476587bSPavel.Paulau        self._task['view_info'] = list()
6405476587bSPavel.Paulau
64116d64f79SPavel.Paulau        rests = [RestConnection(node) for node in self.nodes]
6425476587bSPavel.Paulau        while not self._aborted():
643e34ff75aSPavel.Paulau            time.sleep(interval)
6440d4a7de4SPavel.Paulau            log.info("collecting view indexing stats")
6450e7be65dSPavel.Paulau            for rest in rests:
64623c4aceeSPavel.Paulau                try:
64716d64f79SPavel.Paulau                    data = rest.set_view_info(self.bucket, ddoc)
6481d8a4755SPavel.Paulau                except (SetViewInfoNotFound, ServerUnavailableException), error:
6491d8a4755SPavel.Paulau                    log.error(error)
65023c4aceeSPavel.Paulau                    continue
6515476587bSPavel.Paulau                try:
6522ad6d3a3SPavel.Paulau                    update_history = data[1]['stats']['update_history']
6535476587bSPavel.Paulau                    indexing_time = \
6545476587bSPavel.Paulau                        [event['indexing_time'] for event in update_history]
6555476587bSPavel.Paulau                    avg_time = sum(indexing_time) / len(indexing_time)
6562ad6d3a3SPavel.Paulau                except (IndexError, KeyError,