1import json
2import re
3from threading import Thread
4import time
5import gzip
6from collections import defaultdict
7import logging
8import logging.config
9from uuid import uuid4
10from itertools import cycle
11
12from lib.membase.api.exception import SetViewInfoNotFound, ServerUnavailableException
13from lib.membase.api.rest_client import RestConnection
14from lib.memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached
15from lib.remote.remote_util import RemoteMachineShellConnection, RemoteMachineHelper
16
17
18RETRIES = 10
19
20logging.config.fileConfig('mcsoda.logging.conf')
21logging.getLogger("paramiko").setLevel(logging.WARNING)
22log = logging.getLogger()
23
24hex = lambda: uuid4().hex
25
26
27def histo_percentile(histo, percentiles):
28    """The histo dict is returned by add_timing_sample(). The percentiles must
29    be sorted, ascending, like [0.90, 0.99]."""
30    v_sum = 0
31    bins = histo.keys()
32    bins.sort()
33    for bin in bins:
34        v_sum += histo[bin]
35    v_sum = float(v_sum)
36    v_cur = 0  # Running total.
37    rv = []
38    for bin in bins:
39        if not percentiles:
40            return rv
41        v_cur += histo[bin]
42        while percentiles and (v_cur / v_sum) >= percentiles[0]:
43            rv.append((percentiles[0], bin))
44            percentiles.pop(0)
45    return rv
46
47
48class StatsCollector(object):
49    _task = {}
50    _verbosity = True
51    _mb_stats = {"snapshots": []}  # manually captured memcached stats
52    _reb_stats = {}
53    _lat_avg_stats = {}     # aggregated top level latency stats
54    _xdcr_stats = {}
55
56    def __init__(self, verbosity):
57        self._verbosity = verbosity
58        self.is_leader = False
59        self.active_mergers = 0
60
61    def start(self, nodes, bucket, pnames, name, client_id='',
62              collect_server_stats=True, ddoc=None, clusters=None):
63        """This function starts collecting stats from all nodes with the given
64        interval"""
65        self._task = {"state": "running", "threads": [], "name": name,
66                      "time": time.time(), "ops": [], "totalops": [],
67                      "ops-temp": [], "latency": {}, "data_size_stats": []}
68        rest = RestConnection(nodes[0])
69        info = rest.get_nodes_self()
70        self.data_path = info.storage[0].get_data_path()
71        self.client_id = str(client_id)
72        self.nodes = nodes
73        self.bucket = bucket
74
75        if collect_server_stats:
76            self._task["threads"].append(
77                Thread(target=self.membase_stats, name="membase")
78            )
79            self._task["threads"].append(
80                Thread(target=self.system_stats, name="system", args=(pnames, ))
81            )
82            self._task["threads"].append(
83                Thread(target=self.iostats, name="iostats")
84            )
85            self._task["threads"].append(
86                Thread(target=self.ns_server_stats, name="ns_server")
87            )
88            self._task["threads"].append(
89                Thread(target=self.get_bucket_size, name="bucket_size")
90            )
91            self._task["threads"].append(
92                Thread(target=self.rebalance_progress, name="rebalance_progress")
93            )
94            if ddoc is not None:
95                self._task["threads"].append(
96                    Thread(target=self.indexing_time_stats, name="index_time", args=(ddoc, ))
97                )
98                self._task["threads"].append(
99                    Thread(target=self.indexing_throughput_stats, name="index_thr")
100                )
101            if clusters:
102                self.clusters = clusters
103                self._task["threads"].append(
104                    Thread(target=self.xdcr_lag_stats, name="xdcr_lag_stats")
105                )
106
107            for thread in self._task["threads"]:
108                thread.daemon = True
109                thread.start()
110
111            # Getting build/machine stats from only one node in the cluster
112            self.build_stats(nodes)
113            self.machine_stats(nodes)
114
115            # Start atop
116            self.start_atop()
117
118    def stop(self):
119        self.stop_atop()
120        self._task["state"] = "stopped"
121        for t in self._task["threads"]:
122            t.join(120)
123            if t.is_alive():
124                log.error("failed to join {0} thread".format(t.name))
125
126        self._task["time"] = time.time() - self._task["time"]
127
128    def sample(self, cur):
129        pass
130
131    def get_ns_servers_samples(self, metric):
132        for subset in self._task["ns_server_data"]:
133            samples = subset["op"]["samples"][metric]
134            yield float(sum(samples)) / len(samples)
135
136    def calc_xperf_stats(self):
137        metrics = ("replication_changes_left", "xdc_ops")
138        for metric in metrics:
139            self._xdcr_stats["avg_" + metric] = \
140                sum(self.get_ns_servers_samples(metric)) /\
141                sum(1 for _ in self.get_ns_servers_samples(metric))
142
143    def export(self, name, test_params):
144        for latency in self._task["latency"].keys():
145            # save the last histogram snapshot
146            per_90th_tot = 0
147            histos = self._task["latency"].get(latency, [])
148            if histos:
149                key = latency + "-histogram"
150                self._task["latency"][key] = histos[-1].copy()
151                del self._task["latency"][key]["delta"]
152                self._task["latency"][key]["client_id"] = self.client_id
153
154            # calculate percentiles
155            key = 'percentile-' + latency
156            self._task["latency"][key] = []
157            for histo in histos:
158                # for every sample histogram, produce a temp summary:
159                # temp = [90 per, 95 per, 99 per, client_id, delta]
160                temp = []
161                time = histo['time']
162                delta = histo['delta']
163                del histo['delta'], histo['time']
164                p = histo_percentile(histo, [0.80, 0.90, 0.95, 0.99, 0.999])
165                # p is list of tuples
166                for val in p:
167                    temp.append(val[-1])
168                per_90th_tot += temp[1]
169                temp.append(self.client_id)
170                temp.append(time)
171                temp.append(delta)
172                self._task["latency"][key].append(temp)
173            if per_90th_tot:
174                self._lat_avg_stats["%s-90th-avg" % latency] \
175                    = per_90th_tot / len(histos) * 1000000
176
177        # XDCR stats
178        try:
179            self.calc_xperf_stats()
180        except KeyError:
181            pass
182
183        test_params.update(self._xdcr_stats)
184        test_params.update(self._reb_stats)
185        test_params.update(self._lat_avg_stats)
186
187        obj = {
188            "buildinfo": self._task.get("buildstats", {}),
189            "machineinfo": self._task.get("machinestats", {}),
190            "membasestats": self._task.get("membasestats", []),
191            "systemstats": self._task.get("systemstats", []),
192            "iostats": self._task.get("iostats", []),
193            "name": name,
194            "totalops": self._task["totalops"],
195            "ops": self._task["ops"],
196            "time": self._task["time"],
197            "info": test_params,
198            "ns_server_data": self._task.get("ns_server_stats", []),
199            "ns_server_data_system": self._task.get("ns_server_stats_system", []),
200            "view_info": self._task.get("view_info", []),
201            "indexer_info": self._task.get("indexer_info", []),
202            "xdcr_lag": self._task.get("xdcr_lag", []),
203            "rebalance_progress": self._task.get("rebalance_progress", []),
204            "timings": self._task.get("timings", []),
205            "dispatcher": self._task.get("dispatcher", []),
206            "bucket-size": self._task.get("bucket_size", []),
207            "data-size": self._task.get("data_size_stats", []),
208            "latency-set-histogram": self._task["latency"].get("latency-set-histogram", []),
209            "latency-set": self._task["latency"].get('percentile-latency-set', []),
210            "latency-set-recent": self._task["latency"].get('percentile-latency-set-recent', []),
211            "latency-get-histogram": self._task["latency"].get("latency-get-histogram", []),
212            "latency-get": self._task["latency"].get('percentile-latency-get', []),
213            "latency-get-recent": self._task["latency"].get('percentile-latency-get-recent', []),
214            "latency-delete": self._task["latency"].get('percentile-latency-delete', []),
215            "latency-delete-recent": self._task["latency"].get('percentile-latency-delete-recent', []),
216            "latency-query-histogram": self._task["latency"].get("latency-query-histogram", []),
217            "latency-query": self._task["latency"].get('percentile-latency-query', []),
218            "latency-query-recent": self._task["latency"].get('percentile-latency-query-recent', []),
219            "latency-obs-persist-server-histogram": self._task["latency"].get("latency-obs-persist-server-histogram", []),
220            "latency-obs-persist-server": self._task["latency"].get('percentile-latency-obs-persist-server-server', []),
221            "latency-obs-persist-server-recent": self._task["latency"].get('percentile-latency-obs-persist-server-recent', []),
222            "latency-obs-persist-client-histogram": self._task["latency"].get("latency-obs-persist-client-histogram", []),
223            "latency-obs-persist-client": self._task["latency"].get('percentile-latency-obs-persist-client', []),
224            "latency-obs-persist-client-recent": self._task["latency"].get('percentile-latency-obs-persist-client-recent', []),
225            "latency-obs-repl-client-histogram": self._task["latency"].get("latency-obs-repl-client-histogram", []),
226            "latency-obs-repl-client": self._task["latency"].get('percentile-latency-obs-repl-client', []),
227            "latency-obs-repl-client-recent": self._task["latency"].get('percentile-latency-obs-repl-client-recent', []),
228            "latency-woq-obs-histogram": self._task["latency"].get("latency-woq-obs-histogram", []),
229            "latency-woq-obs": self._task["latency"].get('percentile-latency-woq-obs', []),
230            "latency-woq-obs-recent": self._task["latency"].get('percentile-latency-woq-obs-recent', []),
231            "latency-woq-query-histogram": self._task["latency"].get("latency-woq-query-histogram", []),
232            "latency-woq-query": self._task["latency"].get('percentile-latency-woq-query', []),
233            "latency-woq-query-recent": self._task["latency"].get('percentile-latency-woq-query-recent', []),
234            "latency-woq-histogram": self._task["latency"].get("latency-woq-histogram", []),
235            "latency-woq": self._task["latency"].get('percentile-latency-woq', []),
236            "latency-woq-recent": self._task["latency"].get('percentile-latency-woq-recent', []),
237            "latency-cor-histogram": self._task["latency"].get("latency-cor-histogram", []),
238            "latency-cor": self._task["latency"].get('percentile-latency-cor', []),
239            "latency-cor-recent": self._task["latency"].get('percentile-latency-cor-recent', [])}
240
241        if self.client_id:
242            patterns = ('reload$', 'load$', 'warmup$', 'index$')
243            phases = ('.reload', '.load', '.warmup', '.index')
244            name_picker = lambda (pattern, phase): re.search(pattern, self._task["name"])
245            try:
246                phase = filter(name_picker, zip(patterns, phases))[0][1]
247            except IndexError:
248                phase = '.loop'
249            name = str(self.client_id) + phase
250
251        file = gzip.open("{0}.json.gz".format(name), 'wb')
252        file.write(json.dumps(obj))
253        file.close()
254
255    def get_bucket_size(self, interval=60):
256        self._task["bucket_size"] = []
257        retries = 0
258        nodes_iterator = (node for node in self.nodes)
259        node = nodes_iterator.next()
260        rest = RestConnection(node)
261        while not self._aborted():
262            time.sleep(interval)
263            log.info("collecting bucket size stats")
264            try:
265                status, db_size = rest.get_database_disk_size(self.bucket)
266                if status:
267                    self._task["bucket_size"].append(db_size)
268            except IndexError, e:
269                retries += 1
270                log.error("unable to get bucket size {0}: {1}"
271                          .format(self.bucket, e))
272                log.warning("retries: {0} of {1}".format(retries, RETRIES))
273                if retries == RETRIES:
274                    try:
275                        node = nodes_iterator.next()
276                        rest = RestConnection(node)
277                        retries = 0
278                    except StopIteration:
279                        log.error("no nodes available: stop collecting bucket_size")
280                        return
281
282        log.info("finished bucket size stats")
283
284    def get_data_file_size(self, nodes, interval, bucket):
285        shells = []
286        for node in nodes:
287            try:
288                shells.append(RemoteMachineShellConnection(node))
289            except Exception as error:
290                log.error(error)
291        paths = []
292        if shells[0].is_couchbase_installed():
293            bucket_path = self.data_path + '/{0}'.format(bucket)
294            paths.append(bucket_path)
295            view_path = bucket_path + '/set_view_{0}_design'.format(bucket)
296            paths.append(view_path)
297        else:
298            paths.append(self.data_path + '/{0}-data'.format(bucket))
299
300        d = {"snapshots": []}
301        start_time = str(self._task["time"])
302
303        while not self._aborted():
304            time.sleep(interval)
305            current_time = time.time()
306            i = 0
307            for shell in shells:
308                node = nodes[i]
309                unique_id = node.ip + '-' + start_time
310                value = {}
311                for path in paths:
312                    size = shell.get_data_file_size(path)
313                    value["file"] = path.split('/')[-1]
314                    value["size"] = size
315                    value["unique_id"] = unique_id
316                    value["time"] = current_time
317                    value["ip"] = node.ip
318                    d["snapshots"].append(value.copy())
319                i += 1
320        self._task["data_size_stats"] = d["snapshots"]
321        log.info("finished data_size_stats")
322
323    #ops stats
324    #{'tot-sets': 899999, 'tot-gets': 1, 'tot-items': 899999, 'tot-creates': 899999}
325    def ops_stats(self, ops_stat):
326        ops_stat["time"] = time.time()
327        self._task["ops-temp"].append(ops_stat)
328        if len(self._task["ops-temp"]) >= 500 * (1 + self.active_mergers):
329            # Prevent concurrent merge
330            while self.active_mergers:
331                time.sleep(0.1)
332
333            # Semaphore: +1 active
334            self.active_mergers += 1
335
336            # Merge
337            merged = self._merge()
338            self._task["ops"].append(merged)
339            self._task["ops-temp"] = self._task["ops-temp"][500:]
340
341            # Semaphore: -1 active
342            self.active_mergers -= 1
343
344        #if self._task["ops"] has more than 1000 elements try to aggregate them ?
345
346    def latency_stats(self, latency_cmd, latency_stat, cur_time=0):
347        if self._task["latency"].get(latency_cmd) is None:
348            self._task["latency"][latency_cmd] = []
349        temp_latency_stat = latency_stat.copy()
350        if not cur_time:
351            cur_time = time.time()
352        temp_latency_stat['time'] = int(cur_time)
353        temp_latency_stat['delta'] = cur_time - self._task['time']
354        self._task["latency"][latency_cmd].append(temp_latency_stat)
355
356    def _merge(self):
357        first = self._task["ops-temp"][0]
358        merged = {"startTime": first["start-time"]}
359        totalgets = 0
360        totalsets = 0
361        totalqueries = 0
362        delta = 0
363        for i in range(499):
364            current = self._task["ops-temp"][i]
365            next = self._task["ops-temp"][i + 1]
366            totalgets += current["tot-gets"]
367            totalsets += current["tot-sets"]
368            totalqueries += current["tot-queries"]
369            delta += (next["start-time"] - current["start-time"])
370        merged["endTime"] = merged["startTime"] + delta
371        merged["totalSets"] = totalsets
372        merged["totalGets"] = totalgets
373        merged["totalQueries"] = totalqueries
374        qps = totalqueries / float(delta)
375        merged["queriesPerSec"] = qps
376        return merged
377
378    def total_stats(self, ops_stat):
379        ops_stat["time"] = time.time()
380        self._task["totalops"].append(ops_stat)
381
382    def build_stats(self, nodes):
383        json_response = StatUtil.build_info(nodes[0])
384        self._task["buildstats"] = json_response
385
386    def machine_stats(self, nodes):
387        machine_stats = StatUtil.machine_info(nodes[0])
388        self._task["machinestats"] = machine_stats
389
390    def reb_stats(self, start, dur):
391        log.info("recording reb start = {0}, reb duration = {1}".format(start, dur))
392        self._reb_stats["reb_start"] = start
393        self._reb_stats["reb_dur"] = dur
394
395    def _extract_proc_info(self, shell, pid):
396        output, error = shell.execute_command("cat /proc/{0}/stat".format(pid))
397        fields = (
398            'pid', 'comm', 'state', 'ppid', 'pgrp', 'session', 'tty_nr',
399            'tpgid', 'flags', 'minflt', 'cminflt', 'majflt', 'cmajflt',
400            'utime', 'stime', 'cutime', 'cstime', 'priority ' 'nice',
401            'num_threads', 'itrealvalue', 'starttime', 'vsize', 'rss',
402            'rsslim', 'startcode', 'endcode', 'startstack', 'kstkesp',
403            'kstkeip', 'signal', 'blocked ', 'sigignore', 'sigcatch', 'wchan',
404            'nswap', 'cnswap', 'exit_signal', 'processor', 'rt_priority',
405            'policy', 'delayacct_blkio_ticks', 'guest_time', 'cguest_time')
406
407        return {} if error else dict(zip(fields, output[0].split(' ')))
408
409    def _extract_io_info(self, shell):
410        """
411        Extract info from iostat
412
413        Output:
414
415        [kB_read, kB_wrtn, %util, %iowait, %idle]
416
417        Rough Benchmarks:
418        My local box (WIFI LAN - VM), took ~1.2 sec for this routine
419        """
420        CMD = "iostat -dk | grep 'sd. ' | " \
421              "awk '{read+=$5; write+=$6} END { print read, write }'"
422        out, err = shell.execute_command(CMD)
423        results = out[0]
424
425        CMD = "iostat -dkx | grep 'sd. ' | "\
426              "awk '{util+=$12} END { print util/NR }'"
427        out, err = shell.execute_command(CMD)
428        results = "%s %s" % (results, out[0])
429
430        CMD = "iostat 1 2 -c | awk 'NR == 7 { print $4, $6 }'"
431        out, err = shell.execute_command(CMD)
432        results = "%s %s" % (results, out[0])
433
434        return results.split(' ')
435
436    def system_stats(self, pnames, interval=10):
437        shells = []
438        for node in self.nodes:
439            try:
440                shells.append(RemoteMachineShellConnection(node))
441            except Exception, error:
442                log.error(error)
443        d = {"snapshots": []}
444        #        "pname":"x","pid":"y","snapshots":[{"time":time,"value":value}]
445
446        start_time = str(self._task["time"])
447        while not self._aborted():
448            time.sleep(interval)
449            current_time = time.time()
450            i = 0
451            for shell in shells:
452                node = self.nodes[i]
453                unique_id = node.ip + '-' + start_time
454                for pname in pnames:
455                    obj = RemoteMachineHelper(shell).is_process_running(pname)
456                    if obj and obj.pid:
457                        value = self._extract_proc_info(shell, obj.pid)
458                        value["name"] = pname
459                        value["id"] = obj.pid
460                        value["unique_id"] = unique_id
461                        value["time"] = current_time
462                        value["ip"] = node.ip
463                        d["snapshots"].append(value)
464                i += 1
465        self._task["systemstats"] = d["snapshots"]
466        log.info("finished system_stats")
467
468    def iostats(self, interval=10):
469        shells = []
470        for node in self.nodes:
471            try:
472                shells.append(RemoteMachineShellConnection(node))
473            except Exception, error:
474                log.error(error)
475
476        self._task["iostats"] = []
477
478        log.info("started capturing io stats")
479
480        while not self._aborted():
481            time.sleep(interval)
482            log.info("collecting io stats")
483            for shell in shells:
484                try:
485                    kB_read, kB_wrtn, util, iowait, idle = \
486                        self._extract_io_info(shell)
487                except (ValueError, TypeError, IndexError):
488                    continue
489                if kB_read and kB_wrtn:
490                    self._task["iostats"].append({"time": time.time(),
491                                                 "ip": shell.ip,
492                                                 "read": kB_read,
493                                                 "write": kB_wrtn,
494                                                 "util": util,
495                                                 "iowait": iowait,
496                                                 "idle": idle})
497        log.info("finished capturing io stats")
498
499    def capture_mb_snapshot(self, node):
500        """Capture membase stats snapshot manually"""
501        log.info("capturing memcache stats snapshot for {0}".format(node.ip))
502        stats = {}
503
504        try:
505            bucket = RestConnection(node).get_buckets()[0].name
506            mc = MemcachedClientHelper.direct_client(node, bucket)
507            stats = mc.stats()
508            stats.update(mc.stats("warmup"))
509        except Exception, e:
510            log.error(e)
511            return False
512        finally:
513            stats["time"] = time.time()
514            stats["ip"] = node.ip
515            self._mb_stats["snapshots"].append(stats)
516            print stats
517
518        log.info("memcache stats snapshot captured")
519        return True
520
521    def membase_stats(self, interval=60):
522        mcs = []
523        for node in self.nodes:
524            try:
525                bucket = RestConnection(node).get_buckets()[0].name
526                mc = MemcachedClientHelper.direct_client(node, bucket)
527                mcs.append(mc)
528            except Exception, error:
529                log.error(error)
530        self._task["membasestats"] = []
531        self._task["timings"] = []
532        self._task["dispatcher"] = []
533        data = dict()
534        for mc in mcs:
535            data[mc.host] = {"snapshots": [], "timings": [], "dispatcher": []}
536
537        while not self._aborted():
538            time.sleep(interval)
539            log.info("collecting membase stats")
540            for mc in mcs:
541                for rerty in xrange(RETRIES):
542                    try:
543                        stats = mc.stats()
544                    except Exception as e:
545                        log.warn("{0}, retries = {1}".format(str(e), rerty))
546                        time.sleep(2)
547                        mc.reconnect()
548                    else:
549                        break
550                else:
551                    stats = {}
552                data[mc.host]["snapshots"].append(stats)
553
554                for arg in ("timings", "dispatcher"):
555                    try:
556                        stats = mc.stats(arg)
557                        data[mc.host][arg].append(stats)
558                    except EOFError, e:
559                        log.error("unable to get {0} stats {1}: {2}"
560                                  .format(arg, mc.host, e))
561
562        for host in (mc.host for mc in mcs):
563            unique_id = host + '-' + str(self._task["time"])
564            current_time = time.time()
565
566            if self._mb_stats["snapshots"]:  # use manually captured stats
567                self._task["membasestats"] = self._mb_stats["snapshots"]
568            else:  # use periodically captured stats
569                for snapshot in data[host]["snapshots"]:
570                    snapshot["unique_id"] = unique_id
571                    snapshot["time"] = current_time
572                    snapshot["ip"] = host
573                    self._task["membasestats"].append(snapshot)
574
575            for timing in data[host]["timings"]:
576                timing["unique_id"] = unique_id
577                timing["time"] = current_time
578                timing["ip"] = host
579                self._task["timings"].append(timing)
580
581            for dispatcher in data[host]["dispatcher"]:
582                dispatcher["unique_id"] = unique_id
583                dispatcher["time"] = current_time
584                dispatcher["ip"] = host
585                self._task["dispatcher"].append(dispatcher)
586
587            if data[host]["timings"]:
588                log.info("dumping disk timing stats: {0}".format(host))
589                latests_timings = data[host]["timings"][-1]
590                for key, value in sorted(latests_timings.iteritems()):
591                    if key.startswith("disk"):
592                        print "{0:50s}: {1}".format(key, value)
593
594        log.info("finished membase_stats")
595
596    def ns_server_stats(self, interval=60):
597        self._task["ns_server_stats"] = []
598        self._task["ns_server_stats_system"] = []
599        nodes_iterator = (node for node in self.nodes)
600        node = nodes_iterator.next()
601        retries = 0
602        not_null = lambda v: v if v is not None else 0
603
604        rest = RestConnection(node)
605        while not self._aborted():
606            time.sleep(interval)
607            log.info("collecting ns_server_stats")
608            try:
609                # Bucket stats
610                ns_server_stats = rest.fetch_bucket_stats(bucket=self.bucket)
611                for key, value in ns_server_stats["op"]["samples"].iteritems():
612                    ns_server_stats["op"]["samples"][key] = not_null(value)
613                self._task["ns_server_stats"].append(ns_server_stats)
614                # System stats
615                ns_server_stats_system = rest.fetch_system_stats()
616                self._task["ns_server_stats_system"].append(ns_server_stats_system)
617            except ServerUnavailableException, e:
618                log.error(e)
619            except (ValueError, TypeError), e:
620                log.error("unable to parse json object {0}: {1}".format(node, e))
621            else:
622                continue
623            retries += 1
624            if retries <= RETRIES:
625                log.warning("retries: {0} of {1}".format(retries, RETRIES))
626            else:
627                try:
628                    node = nodes_iterator.next()
629                    rest = RestConnection(node)
630                    retries = 0
631                except StopIteration:
632                    log.error("no nodes available: stop collecting ns_server_stats")
633                    return
634
635        log.info("finished ns_server_stats")
636
637    def indexing_time_stats(self, ddoc, interval=60):
638        """Collect view indexing stats"""
639        self._task['view_info'] = list()
640
641        rests = [RestConnection(node) for node in self.nodes]
642        while not self._aborted():
643            time.sleep(interval)
644            log.info("collecting view indexing stats")
645            for rest in rests:
646                try:
647                    data = rest.set_view_info(self.bucket, ddoc)
648                except (SetViewInfoNotFound, ServerUnavailableException), error:
649                    log.error(error)
650                    continue
651                try:
652                    update_history = data[1]['stats']['update_history']
653                    indexing_time = \
654                        [event['indexing_time'] for event in update_history]
655                    avg_time = sum(indexing_time) / len(indexing_time)
656                except (IndexError, KeyError, ValueError):
657                    avg_time = 0
658                finally:
659                    self._task['view_info'].append({'node': rest.ip,
660                                                    'indexing_time': avg_time,
661                                                    'timestamp': time.time()})
662
663        log.info("finished collecting view indexing stats")
664
665    def indexing_throughput_stats(self, interval=15):
666        self._task['indexer_info'] = list()
667        indexers = defaultdict(dict)
668        rests = [RestConnection(node) for node in self.nodes]
669        while not self._aborted():
670            time.sleep(interval)  # 15 seconds by default
671
672            # Grab indexer tasks from all nodes
673            tasks = list()
674            for rest in rests:
675                try:
676                    active_tasks = rest.active_tasks()
677                except ServerUnavailableException, error:
678                    log.error(error)
679                    continue
680                indexer_tasks = filter(lambda t: t['type'] == 'indexer',
681                                       active_tasks)
682                tasks.extend(indexer_tasks)
683
684            # Calculate throughput for every unique PID
685            thr = 0
686            for task in tasks:
687                uiid = task['pid'] + str(task['started_on'])
688
689                changes_delta = \
690                    task['changes_done'] - indexers[uiid].get('changes_done', 0)
691                time_delta = \
692                    task['updated_on'] - indexers[uiid].get('updated_on',
693                                                            task['started_on'])
694                if time_delta:
695                    thr += changes_delta / time_delta
696                indexers[uiid]['changes_done'] = task['changes_done']
697                indexers[uiid]['updated_on'] = task['updated_on']
698
699            # Average throughput
700            self._task['indexer_info'].append({
701                'indexing_throughput': thr,
702                'timestamp': time.time()
703            })
704
705    def _get_xdcr_latency(self, src_client, dst_client, multi=False):
706        PREFIX = "xdcr_track_"
707        kvs = dict((PREFIX + hex(), hex()) for _ in xrange(10))
708        key = PREFIX + hex()
709        persisted = False
710
711        t0 = t1 = time.time()
712        if multi:
713            src_client.setMulti(0, 0, kvs)
714            while True:
715                try:
716                    dst_client.getMulti(kvs.keys(), timeout_sec=120,
717                                        parallel=False)
718                    break
719                except ValueError:
720                    time.sleep(0.05)
721        else:
722            src_client.set(key, 0, 0, key)
723            while not persisted:
724                _, _, _, persisted, _ = src_client.observe(key)
725            t1 = time.time()
726            while time.time() - t1 < 300:  # 5 minutes timeout
727                try:
728                    dst_client.get(key)
729                    break
730                except:
731                    time.sleep(0.05)
732        total_time = (time.time() - t0) * 1000
733        persist_time = (t1 - t0) * 1000
734
735        if multi:
736            return {"multi_100_xdcr_lag": total_time}
737        else:
738            return {
739                "xdcr_lag": total_time,
740                "xdcr_persist_time": persist_time,
741                "xdcr_diff": total_time - persist_time,
742                "timestamp": time.time()
743            }
744
745    def xdcr_lag_stats(self, interval=5):
746        master = self.clusters[0][0]
747        slave = self.clusters[1][0]
748        src_client = VBucketAwareMemcached(RestConnection(master), self.bucket)
749        dst_client = VBucketAwareMemcached(RestConnection(slave), self.bucket)
750
751        log.info("started xdcr lag measurements")
752        self._task["xdcr_lag"] = list()
753        while not self._aborted():
754            single_stats = self._get_xdcr_latency(src_client, dst_client)
755            multi_stats = self._get_xdcr_latency(src_client, dst_client, True)
756            multi_stats.update(single_stats)
757            self._task['xdcr_lag'].append(multi_stats)
758            time.sleep(interval)
759
760        filename = time.strftime("%Y%m%d_%H%M%S_xdcr_lag.json",
761                                 time.localtime())
762        with open(filename, "w") as fh:
763            fh.write(json.dumps(self._task['xdcr_lag'],
764                                indent=4, sort_keys=True))
765        log.info("finished xdcr lag measurements")
766
767    def rebalance_progress(self, interval=15):
768        self._task["rebalance_progress"] = list()
769        nodes = cycle(self.nodes)
770        rest = RestConnection(nodes.next())
771        while not self._aborted():
772            try:
773                tasks = rest.ns_server_tasks()
774            except ServerUnavailableException, error:
775                log.error(error)
776                rest = RestConnection(nodes.next())
777                continue
778            for task in tasks:
779                if task["type"] == "rebalance":
780                    self._task["rebalance_progress"].append({
781                        "rebalance_progress": task.get("progress", 0),
782                        "timestamp": time.time()
783                    })
784                    break
785            time.sleep(interval)
786        log.info("finished active_tasks measurements")
787
788    def _aborted(self):
789        return self._task["state"] == "stopped"
790
791    def start_atop(self):
792        """Start atop collector"""
793        for node in self.nodes:
794            try:
795                shell = RemoteMachineShellConnection(node)
796            except SystemExit:
797                log.error("can't establish SSH session with {0}".format(node.ip))
798            else:
799                cmd = "killall atop; rm -fr /tmp/*.atop;" + \
800                    "atop -w /tmp/{0}.atop -a 15".format(node.ip) + \
801                    " > /dev/null 2> /dev.null < /dev/null &"
802                shell.execute_command(cmd)
803
804    def stop_atop(self):
805        """Stop atop collector"""
806        for node in self.nodes:
807            try:
808                shell = RemoteMachineShellConnection(node)
809            except SystemExit:
810                log.error("can't establish SSH session with {0}".format(node.ip))
811            else:
812                shell.execute_command("killall atop")
813
814class CallbackStatsCollector(StatsCollector):
815
816    """Invokes optional callback when registered levels have been reached
817    during stats sample()'ing."""
818
819    def __init__(self, verbosity):
820        # Tuples of level_name, level, callback.
821        self.level_callbacks = []
822        super(CallbackStatsCollector, self).__init__(verbosity)
823
824    def sample(self, cur):
825        for level_name, level, callback in self.level_callbacks:
826            if level < cur.get(level_name, -1):
827                callback(cur)
828
829        return super(CallbackStatsCollector, self).sample(cur)
830
831
832class StatUtil(object):
833
834    @staticmethod
835    def build_info(node):
836        rest = RestConnection(node)
837        api = rest.baseUrl + 'nodes/self'
838        status, content, header = rest._http_request(api)
839        json_parsed = json.loads(content)
840        return json_parsed
841
842    @staticmethod
843    def machine_info(node):
844        shell = RemoteMachineShellConnection(node)
845        info = shell.extract_remote_info()
846        return {"type": info.type, "distribution": info.distribution_type,
847                "version": info.distribution_version, "ram": info.ram,
848                "cpu": info.cpu, "disk": info.disk, "hostname": info.hostname}
849