1"""pushes stats from serieslydb to CBFS_HOST
2
3This module works by collecting stats from the seriesly database specified in testcfg.py
4and furthermore uses the pandas (python data-anysis) module to store the stats into a dataframe
5that is compatibale with version comparisions in the report generator.  Once in-memory as a
6dataframe the stats are dumped to a csv file, compressed and pushed to CBFS.
7
8usage: push_stats.py [-h] --spec <dir>/<test.js>
9                          --version VERSION
10                          --build BUILD
11                          [--name NAME]
12                          [--cluster default]
13
14Where spec name is a required argument specifying the file used to generate stats.
15
16"""
17
18import sys
19sys.path.append(".")
20import argparse
21import json
22import gzip
23import testcfg as cfg
24import pandas as pd
25import os
26import shutil
27from seriesly import Seriesly, exceptions
28import requests
29
30# cbfs
31CBFS_HOST = 'http://10.5.0.128:8484'
32
33# archives array for keeping track of files to push (archive) into cbfs
34archives = []
35
36# setup parser
37parser = argparse.ArgumentParser(description='CB System Test Stat Pusher')
38parser.add_argument("--spec", help="path to json file used in test", metavar="<dir>/<test.js>", required = True)
39parser.add_argument("--version",  help="couchbase version.. ie (2.2.0)", required = True)
40parser.add_argument("--build",  help="build number", required = True)
41parser.add_argument("--name", default=None, help="use to override name in test spec")
42parser.add_argument("--cluster", default="default", help="should match whatever you set for CB_CLUSTER_TAG")
43
44## connect to seriesly
45conn = Seriesly(cfg.SERIESLY_IP, 3133)
46
47
48
49""" getDBData
50
51  retrieve timeseries data from seriesly
52
53"""
54def getDBData(db):
55  data = None
56
57  try:
58    db = conn[db]
59    data = db.get_all()
60    data = stripData(data)
61  except exceptions.NotExistingDatabase:
62    print "DB Not found: %s" % db
63    print "cbmonitor running?"
64    sys.exit(-1)
65
66  return (data, None)[len(data) == 0]
67
68""" stripData
69
70  use data from the event db to collect only data from preceeding test
71
72"""
73def stripData(data):
74  ev, _ = getSortedEventData()
75
76  if ev is None:
77    return data
78
79  start_time = ev[0]
80  copy = {}
81
82  # remove data outside of start_time
83  for d in data:
84    if d >= start_time:
85      copy[d] = data[d]
86
87  del data
88  return copy
89
90def getSortedEventData():
91  keys = data = None
92
93  if 'event' in conn.list_dbs():
94    data = conn['event'].get_all()
95    if(len(data) > 0):
96      keys, data = sortDBData(data)
97    else:
98      print "warning: eventdb exists but is empty"
99  else:
100    print "warning: eventdb not found in seriesly db"
101
102  return keys, data
103
104
105def get_query_params(start_time):
106  query_params = { "group": 10000,
107                   "reducer": "identity",
108                   "from": start_time,
109                   "ptr" : ""
110                 }
111  return query_params
112
113
114"""
115" sort data by its timestamp keys
116"""
117def sortDBData(data):
118
119  sorted_data = []
120  keys = []
121  if(data):
122    keys = sorted(data.iterkeys())
123
124  for ts in keys:
125    sorted_data.append(data[ts])
126
127  return keys, sorted_data
128
129def getSortedDBData(db):
130  return sortDBData(getDBData(db))
131
132"""
133" make a timeseries dataframe
134"""
135def _createDataframe(index, data):
136
137  df = None
138
139  try:
140
141    if(data):
142      df = pd.DataFrame(data)
143      df.index = index
144
145  except ValueError as ex:
146    print "unable to create dataframe: has incorrect format"
147    raise Exception(ex)
148
149  return df
150
151"""
152" get data from seriesly and convert to a 2d timeseries dataframe rows=ts, columns=stats
153"""
154def createDataframe(db):
155  df = None
156  data = getDBData(db)
157
158  if data:
159    index, data = getSortedDBData(db)
160    df = _createDataframe(index, data)
161  else:
162    print "WARNING: stat db %s is empty!" % db
163
164  return df
165
166
167"""
168" store stats per-phase to csv
169"""
170def storePhase(ns_dataframe, version, test, build, bucket):
171
172  path = "system-test-results/%s/%s/%s/%s" % (version, test, build, bucket)
173  print "Generating stats: %s" % path
174
175  phase_dataframe = None
176  columns = ns_dataframe.columns
177  event_idx, _ = getSortedEventData()
178  if event_idx is None:
179    print "storing all data in single phase"
180    dataframeToCsv(ns_dataframe, path, test, 0)
181
182  else:
183    # plot each phase
184    for i in xrange(len(event_idx)):
185      if i == 0:
186        phase_dataframe = ns_dataframe[ns_dataframe.index < event_idx[i+1]]
187      elif i == len(event_idx) - 1:
188        phase_dataframe = ns_dataframe[ns_dataframe.index > event_idx[i]]
189      else:
190        phase_dataframe = ns_dataframe[ (ns_dataframe.index < event_idx[i+1]) &\
191          (ns_dataframe.index > event_idx[i])]
192      dataframeToCsv(phase_dataframe, path, test, i)
193
194def dataframeToCsv(dataframe, path, test, phase_no):
195    ph_csv  = "%s/%s_phase%s.csv" % (path, test, phase_no)
196    ph_csv_gz  = "%s.gz" % ph_csv
197    dataframe.to_csv(ph_csv)
198    f = gzip.open(ph_csv_gz, 'wb')
199    f.writelines(open(ph_csv, 'rb'))
200    f.close()
201    os.remove(ph_csv)
202    archives.append(ph_csv_gz)
203
204
205def generateStats(version, test, build, dbs):
206
207  for db in dbs:
208    ns_dataframe = createDataframe('%s' % db.name)
209
210    if ns_dataframe:
211      storePhase(ns_dataframe, version, test, build, db.bucket)
212
213
214def pushStats():
215
216  for data_file in archives:
217    url = '%s/%s' % (CBFS_HOST, data_file)
218    print "Uploading: " + url
219    suffix = data_file.split('.')[-1]
220
221    if(suffix == 'js'):
222      headers = {'content-type': 'text/javascript'}
223    else:
224      headers = {'content-type': 'application/x-gzip'}
225    data = open(data_file,'rb')
226    r = requests.put(url, data=data, headers=headers)
227    print r.text
228
229def mkdir(path):
230  if not os.path.exists(path):
231      os.makedirs(path)
232  else:
233      shutil.rmtree(path)
234      os.makedirs(path)
235
236def prepareEnv(version, test, build, dbs):
237
238  for db in dbs:
239    path = "system-test-results/%s/%s/%s/%s" % (version, test, build, db.bucket)
240    mkdir(path)
241
242
243
244def loadSpec(spec):
245  try:
246    f = open(spec)
247    specJS = json.loads(f.read())
248    return specJS
249  except Exception as ex:
250    print "Invalid test spec: "+ str(ex)
251    sys.exit(-1)
252
253def setName(name, spec):
254
255  if name is None:
256    if 'name' in spec:
257      name = str(spec['name'])
258    else:
259      print "test name missing from spec"
260      sys.exit(-1)
261
262  # remove spaces
263  name = name.replace(' ','_')
264  return name
265
266def getDBs(cluster = 'default'):
267
268  dbs = []
269
270  if len(conn.list_dbs()) == 0:
271    print "seriesly database is empty, check SERIESLY_IP in your testcfg.py"
272    sys.exit(-1)
273
274  bucket_dbs = [db_name for db_name in conn.list_dbs() if db_name.find('ns_server'+cluster)==0 ]
275
276
277  for db in bucket_dbs:
278    # filter out dbs with host ip/name attached
279    if(len([bucket for bucket in bucket_dbs if bucket.find(db) == 0]) != 1):
280      db = DB('ns_server', db)
281      dbs.append(db)
282
283  atop_dbs = [db_name for db_name in conn.list_dbs() if db_name.find('atop'+cluster)==0]
284  for db in atop_dbs:
285    dbs.append(DB('atop'+cluster,db))
286
287  latency_dbs = [db_name for db_name in conn.list_dbs() if db_name.find('latency') > 0]
288  for db in latency_dbs:
289    dbs.append(DB('',db))
290
291  if(len(dbs) == 0):
292    print "no bucket data in seriesly db"
293    print "did you try with '--cluster %s' ?" % cfg.CB_CLUSTER_TAG
294    sys.exit(-1)
295
296  return dbs
297
298def createInfoFile(version, test, build, dbs, specPath):
299
300  path = "system-test-results/%s/%s/%s" % (version, test, build)
301  fname = '%s/_info.js' % path
302  specName = specPath.split('/')[-1]
303
304  info = {'buckets' : [db.bucket for db in dbs],
305          'spec' : specName,
306          'files' : archives}
307
308  f = open(fname, 'wb')
309  f.write(json.dumps(info))
310
311  # archive info for pushing to cbfs
312  archives.append(fname)
313
314  # archive spec for pushing to cbfs
315  shutil.copy(specPath, path)
316  archives.append("%s/%s" % (path, specName))
317
318class DB(object):
319  def __init__(self, prefix, name):
320    self.prefix = prefix
321    self.name = name
322    self.bucket= name[len(prefix):]
323
324def main():
325
326
327  args = parser.parse_args()
328  specPath = args.spec
329  spec = loadSpec(specPath)
330  test = setName(args.name, spec)
331  build = args.build
332  version = args.version
333  cluster = args.cluster
334  dbs = getDBs(cluster)
335
336  prepareEnv(version, test, build, dbs)
337  generateStats(version, test, build, dbs)
338  createInfoFile(version, test, build, dbs, specPath)
339  pushStats()
340
341if __name__ == "__main__":
342    main()
343