1import subprocess, time, os
2from subprocess import call
3from threading import Thread
4from clitest.cli_base import CliBaseTest
5from remote.remote_util import RemoteMachineHelper,\
6                               RemoteMachineShellConnection
7from couchbase_helper.documentgenerator import BlobGenerator
8from membase.api.rest_client import RestConnection, RestHelper
9from testconstants import LOG_FILE_NAMES
10
11from couchbase_helper.document import View
12
13LOG_FILE_NAME_LIST = ["couchbase.log", "diag.log", "ddocs.log", "ini.log", "syslog.tar.gz",
14                      "ns_server.couchdb.log", "ns_server.debug.log", "ns_server.babysitter.log",
15                      "ns_server.error.log", "ns_server.info.log",
16                      "ns_server.views.log", "stats.log",
17                      "memcached.log", "ns_server.mapreduce_errors.log",
18                      "ns_server.stats.log", "ns_server.xdcr_errors.log",
19                      "ns_server.xdcr.log"]
20
21
22class CollectinfoTests(CliBaseTest):
23
24    def setUp(self):
25        super(CollectinfoTests, self).setUp()
26        self.log_filename = self.input.param("filename", "info")
27        self.doc_ops = self.input.param("doc_ops", None)
28        self.expire_time = self.input.param("expire_time", 5)
29        self.value_size = self.input.param("value_size", 256)
30        self.node_down = self.input.param("node_down", False)
31        self.collect_all_option = self.input.param("collect-all-option", False)
32        if self.doc_ops is not None:
33            self.doc_ops = self.doc_ops.split(";")
34
35    def tearDown(self):
36        super(CollectinfoTests, self).tearDown()
37
38    def collectinfo_test(self):
39        """We use cbcollect_info to automatically collect the logs for server node
40
41        First we load some items to the node. Optionally you can do some mutation
42        against these items. Then we use cbcollect_info the automatically generate
43        the zip file containing all the logs about the node. We want to verify we have
44        all the log files according to the LOG_FILE_NAME_LIST and in stats.log, we have
45        stats for all the buckets we have created"""
46
47        gen_load = BlobGenerator('nosql', 'nosql-', self.value_size,
48                                                  end=self.num_items)
49        gen_update = BlobGenerator('nosql', 'nosql-', self.value_size,
50                                          end=(self.num_items / 2 - 1))
51        gen_expire = BlobGenerator('nosql', 'nosql-', self.value_size,
52                                             start=self.num_items / 2,
53                                             end=(self.num_items * 3 / 4 - 1))
54        gen_delete = BlobGenerator('nosql', 'nosql-', self.value_size,
55                                                 start=self.num_items * 3 / 4,
56                                                          end=self.num_items)
57        self._load_all_buckets(self.master, gen_load, "create", 0)
58
59        if(self.doc_ops is not None):
60            if("update" in self.doc_ops):
61                self._load_all_buckets(self.master, gen_update, "update", 0)
62            if("delete" in self.doc_ops):
63                self._load_all_buckets(self.master, gen_delete, "delete", 0)
64            if("expire" in self.doc_ops):
65                self._load_all_buckets(self.master, gen_expire, "update",\
66                                                               self.expire_time)
67                self.sleep(self.expire_time + 1)
68        self._wait_for_stats_all_buckets(self.servers[:self.num_servers])
69
70        self.shell.delete_files("%s.zip" % (self.log_filename))
71        """ This is the folder generated after unzip the log package """
72        self.shell.delete_files("cbcollect_info*")
73
74        cb_server_started = False
75        if self.node_down:
76            """ set autofailover to off """
77            rest = RestConnection(self.master)
78            rest.update_autofailover_settings(False, 60)
79            if self.os == 'linux':
80                output, error = self.shell.execute_command(
81                                    "killall -9 memcached & killall -9 beam.smp")
82                self.shell.log_command_output(output, error)
83        output, error = self.shell.execute_cbcollect_info("%s.zip"
84                                                           % (self.log_filename))
85
86        if self.os != "windows":
87            if len(error) > 0:
88                raise Exception("Command throw out error: %s " % error)
89
90            for output_line in output:
91                if output_line.find("ERROR") >= 0 or output_line.find("Error") >= 0:
92                    if "from http endpoint" in output_line.lower():
93                        continue
94                    raise Exception("Command throw out error: %s " % output_line)
95        try:
96            if self.node_down:
97                if self.os == 'linux':
98                    self.shell.start_server()
99                    rest = RestConnection(self.master)
100                    if RestHelper(rest).is_ns_server_running(timeout_in_seconds=60):
101                        cb_server_started = True
102                    else:
103                        self.fail("CB server failed to start")
104            self.verify_results(self, self.log_filename)
105        finally:
106            if self.node_down and not cb_server_started:
107                if self.os == 'linux':
108                    self.shell.start_server()
109                    rest = RestConnection(self.master)
110                    if not RestHelper(rest).is_ns_server_running(timeout_in_seconds=60):
111                        self.fail("CB server failed to start")
112
113    def test_cbcollectinfo_detect_container(self):
114        """ this test only runs inside docker host and
115            detect if a node is a docker container.
116            It should run with param skip_init_check_cbserver=true """
117        docker_id = None
118        if "." in self.ip:
119            self.fail("This test only run in docker host")
120        elif self.ip is not None:
121            docker_id = self.ip
122            os.system("docker exec %s %scbcollect_info testlog.zip"
123                                        % (docker_id, self.cli_command_path))
124            os.system("docker cp %s:/testlog.zip ." % (docker_id))
125            os.system("unzip testlog.zip")
126            output = call("cd cbcollect_info_*; grep 'docker' ./* ")
127            if output and "docker" in output:
128                self.log.info("cbcollect log detected docker container")
129            else:
130                self.fail("cbcollect info could not detect docker container")
131        os.system("docker exec %s rm testlog.zip" % (docker_id))
132
133    def test_not_collect_stats_hash_in_cbcollectinfo(self):
134        """ this test verifies we don't collect stats hash
135            in when run cbcollectinfo
136            params: nodes_init=2
137        """
138        check_version = ["5.1.2", "5.5.1"]
139        mesg = "memcached stats ['hash', 'detail']"
140        if self.cb_version[:5] not in check_version \
141                                   or float(self.cb_version[:3]) < 6.0:
142            self.log.info("\nThis version {0} does not need to test {1}"\
143                                          .format(self.cb_version, mesg))
144            return
145        self.shell.delete_files("{0}.zip".format(self.log_filename))
146        """ This is the folder generated after unzip the log package """
147        self.shell.delete_files("cbcollect_info*")
148        output, error = self.shell.execute_cbcollect_info("%s.zip"
149                                                           % (self.log_filename))
150        if output:
151            for x in output:
152                if x.startswith(mesg):
153                    self.fail("cbcollectinfo should not collect {0}".format(mesg))
154        self.log.info("cbcollectinfo does not collect {0}".format(mesg))
155
156    @staticmethod
157    def verify_results(self, output_file_name):
158        try:
159            os = "linux"
160            zip_file = "%s.zip" % (output_file_name)
161            info = self.shell.extract_remote_info()
162            type = info.type.lower()
163            if type == 'windows':
164                os = "windows"
165
166            if os == "linux":
167                command = "unzip %s" % (zip_file)
168                output, error = self.shell.execute_command(command)
169                self.sleep(2)
170                if self.debug_logs:
171                    self.shell.log_command_output(output, error)
172                if len(error) > 0:
173                    raise Exception("unable to unzip the files. Check unzip command output for help")
174
175                command = "ls cbcollect_info*/"
176                output, error = self.shell.execute_command(command)
177                if self.debug_logs:
178                    self.shell.log_command_output(output, error)
179                if len(error) > 0:
180                    raise Exception("unable to list the files. Check ls command output for help")
181                missing_logs = False
182                nodes_services = RestConnection(self.master).get_nodes_services()
183                for node, services in nodes_services.iteritems():
184                    for service in services:
185                        if service.encode("ascii") == "fts" and \
186                                     self.master.ip in node and \
187                                    "fts_diag.json" not in LOG_FILE_NAMES:
188                            LOG_FILE_NAMES.append("fts_diag.json")
189                        if service.encode("ascii") == "index" and \
190                                            self.master.ip in node:
191                            if "indexer_mprof.log" not in LOG_FILE_NAMES:
192                                LOG_FILE_NAMES.append("indexer_mprof.log")
193                            if "indexer_pprof.log" not in LOG_FILE_NAMES:
194                                LOG_FILE_NAMES.append("indexer_pprof.log")
195                if self.debug_logs:
196                    self.log.info('\nlog files sample: {0}'.format(LOG_FILE_NAMES))
197                    self.log.info('\nlog files in zip: {0}'.format(output))
198
199                for x in LOG_FILE_NAMES:
200                    find_log = False
201                    for output_line in output:
202                        if output_line.find(x) >= 0:
203                            find_log = True
204                    if not find_log:
205                        # missing syslog.tar.gz in mac as in ticket MB-9110
206                        # need to remove 3 lines below if it is fixed in 2.2.1
207                        # in mac os
208                        if x == "syslog.tar.gz" and info.distribution_type.lower() == "mac":
209                            missing_logs = False
210                        else:
211                            missing_logs = True
212                            self.log.error("The log zip file miss %s" % (x))
213
214                missing_buckets = False
215                if not self.node_down:
216                    for bucket in self.buckets:
217                        command = "grep %s cbcollect_info*/stats.log" % (bucket.name)
218                        output, error = self.shell.execute_command(command)
219                        if self.debug_logs:
220                            self.shell.log_command_output(output, error)
221                        if len(error) > 0:
222                            raise Exception("unable to grep key words. Check grep command output for help")
223                        if len(output) == 0:
224                            missing_buckets = True
225                            self.log.error("%s stats are missed in stats.log" % (bucket.name))
226
227                command = "du -s cbcollect_info*/*"
228                output, error = self.shell.execute_command(command)
229                if self.debug_logs:
230                    self.shell.log_command_output(output, error)
231                empty_logs = False
232                if len(error) > 0:
233                    raise Exception("unable to list file size. Check du command output for help")
234                for output_line in output:
235                    output_line = output_line.split()
236                    file_size = int(output_line[0])
237                    if self.debug_logs:
238                        print "File size: ", file_size
239                    if file_size == 0:
240                        if "kv_trace" in output_line[1] and self.node_down:
241                            continue
242                        else:
243                            empty_logs = True
244                            self.log.error("%s is empty" % (output_line[1]))
245
246                if missing_logs:
247                    raise Exception("Bad log file package generated. Missing logs")
248                if missing_buckets:
249                    raise Exception("Bad stats.log which miss some bucket information")
250                if empty_logs:
251                    raise Exception("Collect empty log files")
252            elif os == "windows":
253                # try to figure out what command works for windows for verification
254                pass
255
256        finally:
257            self.shell.delete_files(zip_file)
258            self.shell.delete_files("cbcollect_info*")
259
260    def collectinfo_test_for_views(self):
261        self.default_design_doc_name = "Doc1"
262        self.view_name = self.input.param("view_name", "View")
263        self.generate_map_reduce_error = self.input.param("map_reduce_error", False)
264        self.default_map_func = 'function (doc) { emit(doc.age, doc.first_name);}'
265        self.gen_load = BlobGenerator('couch', 'cb-', self.value_size, end=self.num_items)
266        self._load_all_buckets(self.master, self.gen_load, "create", 0)
267        self.reduce_fn = "_count"
268        expected_num_items = self.num_items
269        if self.generate_map_reduce_error:
270            self.reduce_fn = "_sum"
271            expected_num_items = None
272
273        view = View(self.view_name, self.default_map_func, self.reduce_fn, dev_view=False)
274        self.cluster.create_view(self.master, self.default_design_doc_name, view,
275                                 'default', self.wait_timeout * 2)
276        query = {"stale": "false", "connection_timeout": 60000}
277        try:
278            self.cluster.query_view(self.master, self.default_design_doc_name, self.view_name, query,
279                                expected_num_items, 'default', timeout=self.wait_timeout)
280        except Exception, ex:
281            if not self.generate_map_reduce_error:
282                raise ex
283        self.shell.execute_cbcollect_info("%s.zip" % (self.log_filename))
284        self.verify_results(self, self.log_filename)
285
286    def test_default_collect_logs_in_cluster(self):
287        """
288           In a cluster, if we run cbcollectinfo from 1 node, it will collect logs
289           on 1 node only.
290           Initial nodes: 3
291        """
292        gen_load = BlobGenerator('cbcollect', 'cbcollect-', self.value_size,
293                                                            end=self.num_items)
294        self._load_all_buckets(self.master, gen_load, "create", 0)
295        self._wait_for_stats_all_buckets(self.servers[:self.num_servers])
296        self.log.info("Delete old logs files")
297        self.shell.delete_files("%s.zip" % (self.log_filename))
298        self.log.info("Delete old logs directory")
299        self.shell.delete_files("cbcollect_info*")
300        options = ""
301        if self.collect_all_option:
302            options = "--multi-node-diag"
303            self.log.info("Run collect log with --multi-node-diag option")
304        output, error = self.shell.execute_cbcollect_info("%s.zip %s"\
305                                                       % (self.log_filename, options))
306        if output:
307            if self.debug_logs:
308                    self.shell.log_command_output(output, error)
309            for line in output:
310                if "--multi-node-diag" in options:
311                    if "noLogs=1&oneNode=1" in line:
312                        self.log.error("Error line: %s" % line)
313                        self.fail("cbcollect got diag only from 1 node")
314                if not options:
315                    if "noLogs=1" in line:
316                        if "oneNode=1" not in line:
317                            self.log.error("Error line: %s" % line)
318                            self.fail("cbcollect did not set to collect diag only at 1 node ")
319        self.verify_results(self, self.log_filename)
320
321    def test_cbcollectinfo_memory_usuage(self):
322        """
323           Test to make sure cbcollectinfo did not use a lot of memory.
324           We run test with 200K items with size 128 bytes
325        """
326        gen_load = BlobGenerator('cbcollect', 'cbcollect-', self.value_size,
327                                                            end=200000)
328        self._load_all_buckets(self.master, gen_load, "create", 0)
329        self._wait_for_stats_all_buckets(self.servers[:self.num_servers])
330        self.log.info("Delete old logs files")
331        self.shell.delete_files("%s.zip" % (self.log_filename))
332        self.log.info("Delete old logs directory")
333        self.shell.delete_files("cbcollect_info*")
334        options = ""
335        if self.collect_all_option:
336            options = "--multi-node-diag"
337            self.log.info("Run collect log with --multi-node-diag option")
338
339        collect_threads = []
340        col_thread = Thread(target=self.shell.execute_cbcollect_info,
341                                        args=("%s.zip" % (self.log_filename), options))
342        collect_threads.append(col_thread)
343        col_thread.start()
344        monitor_mem_thread = Thread(target=self._monitor_collect_log_mem_process)
345        collect_threads.append(monitor_mem_thread)
346        monitor_mem_thread.start()
347        self.thred_end = False
348        while not self.thred_end:
349            if not col_thread.isAlive():
350                self.thred_end = True
351        for t in collect_threads:
352            t.join()
353
354    def _monitor_collect_log_mem_process(self):
355        mem_stat = []
356        results = []
357        shell = RemoteMachineShellConnection(self.master)
358        vsz, rss = RemoteMachineHelper(shell).monitor_process_memory('cbcollect_info')
359        vsz_delta = max(abs(x - y) for (x, y) in zip(vsz[1:], vsz[:-1]))
360        rss_delta = max(abs(x - y) for (x, y) in zip(rss[1:], rss[:-1]))
361        self.log.info("The largest delta in VSZ: %s KB " % vsz_delta)
362        self.log.info("The largest delta in RSS: %s KB " % rss_delta)
363
364        if vsz_delta > 20000:
365            self.fail("cbcollect_info process spikes up to 20 MB")