1import copy
2import json, filecmp, itertools
3import os, shutil, ast
4from threading import Thread
5
6from membase.api.rest_client import RestConnection
7from memcached.helper.data_helper import MemcachedClientHelper
8from TestInput import TestInputSingleton
9from clitest.cli_base import CliBaseTest
10from couchbase_helper.stats_tools import StatsCommon
11from couchbase_helper.cluster import Cluster
12from remote.remote_util import RemoteMachineShellConnection
13from membase.helper.bucket_helper import BucketOperationHelper
14from membase.helper.cluster_helper import ClusterOperationHelper
15from couchbase_helper.documentgenerator import BlobGenerator, JsonDocGenerator
16from couchbase_helper.data_analysis_helper import DataCollector
17from couchbase_cli import CouchbaseCLI
18from security.rbac_base import RbacBase
19from pprint import pprint
20from testconstants import CLI_COMMANDS, COUCHBASE_FROM_WATSON,\
21                          COUCHBASE_FROM_SPOCK, LINUX_COUCHBASE_BIN_PATH,\
22                          WIN_COUCHBASE_BIN_PATH, COUCHBASE_FROM_SHERLOCK
23
24
25class ImportExportTests(CliBaseTest):
26    def setUp(self):
27        super(ImportExportTests, self).setUp()
28        self.cluster_helper = Cluster()
29        self.ex_path = self.tmp_path + "export{0}/".format(self.master.ip)
30        self.num_items = self.input.param("items", 1000)
31        self.localhost = self.input.param("localhost", False)
32        self.json_create_gen = JsonDocGenerator("imex", op_type="create",
33                                       encoding="utf-8", start=0, end=self.num_items)
34        self.json_delete_gen = JsonDocGenerator("imex", op_type="delete",
35                                       encoding="utf-8", start=0, end=self.num_items)
36
37    def tearDown(self):
38        super(ImportExportTests, self).tearDown()
39        self.import_back = self.input.param("import_back", False)
40        ClusterOperationHelper.cleanup_cluster(self.servers, self.servers[0])
41        if self.import_back:
42            self.log.info("clean up server in import back tests")
43            imp_servers = copy.deepcopy(self.servers[2:])
44            BucketOperationHelper.delete_all_buckets_or_assert(imp_servers, self)
45            ClusterOperationHelper.cleanup_cluster(imp_servers, imp_servers[0])
46            ClusterOperationHelper.wait_for_ns_servers_or_assert(imp_servers, self)
47
48
49    def test_check_require_import_flags(self):
50        require_flags = ['Flag required, but not specified: -c/--cluster',
51                         'Flag required, but not specified: -u/--username',
52                         'Flag required, but not specified: -p/--password',
53                         'Flag required, but not specified: -b/--bucket',
54                         'Flag required, but not specified: -d/--dataset',
55                         'Flag required, but not specified: -f/--format']
56
57        cmd = "%s%s%s %s --no-ssl-verify"\
58                    % (self.cli_command_path, "cbimport", self.cmd_ext, self.imex_type)
59        output, error = self.shell.execute_command(cmd)
60        num_require_flags = 6
61        if self.imex_type == "csv":
62            num_require_flags = 5
63        self.log.info("output from command run %s " % output[:num_require_flags])
64        self.assertEqual(require_flags[:num_require_flags], output[:num_require_flags],
65                                       "Error in require flags of cbimport")
66
67    def test_check_require_export_flags(self):
68        require_flags = ['Flag required, but not specified: -c/--cluster',
69                         'Flag required, but not specified: -u/--username',
70                         'Flag required, but not specified: -p/--password',
71                         'Flag required, but not specified: -b/--bucket',
72                         'Flag required, but not specified: -f/--format',
73                         'Flag required, but not specified: -o/--output']
74
75        cmd = "%s%s%s %s --no-ssl-verify"\
76                    % (self.cli_command_path, "cbexport", self.cmd_ext, "json")
77        output, error = self.shell.execute_command(cmd)
78        self.log.info("output from command run %s " % output[:6])
79        self.assertEqual(require_flags[:6], output[:6],
80                                       "Error in require flags of cbexport")
81
82    def test_export_from_empty_bucket(self):
83        options = {"load_doc": False, "bucket":"empty"}
84        return self._common_imex_test("export", options)
85
86    def test_export_from_sasl_bucket(self):
87        options = {"load_doc": True, "docs":"1000"}
88        return self._common_imex_test("export", options)
89
90    def test_export_and_import_back(self):
91        options = {"load_doc": True, "docs":"1000"}
92        return self._common_imex_test("export", options)
93
94    def test_export_with_localhost(self):
95        """
96           Set localhost param = True
97           IP address will be replaced with localhost
98        """
99        options = {"load_doc": True, "docs":"1000"}
100        return self._common_imex_test("export", options)
101
102    def test_export_delete_expired_updated_data(self):
103        """
104           @delete_percent = percent keys deleted
105           @udpated = update doc
106           @update_field = [fields need to update] like update_field=['dept']
107             When do update_field, "mutated" should not = 0
108
109        """
110        username = self.input.param("username", None)
111        password = self.input.param("password", None)
112        delete_percent = self.input.param("delete_percent", None)
113        updated = self.input.param("updated", False)
114        update_field = self.input.param("update_field", None)
115        path = self.input.param("path", None)
116        self.ex_path = self.tmp_path + "export{0}/".format(self.master.ip)
117        master = self.servers[0]
118        server = copy.deepcopy(master)
119
120        if username is None:
121            username = server.rest_username
122        if password is None:
123            password = server.rest_password
124        self.cli_command_path = "cd %s; ./" % self.cli_command_path
125        self.buckets = RestConnection(server).get_buckets()
126        total_items = self.num_items
127        for bucket in self.buckets:
128            doc_create_gen = copy.deepcopy(self.json_create_gen)
129            self.cluster.load_gen_docs(self.master, bucket.name,
130                                               doc_create_gen, bucket.kvs[1], "create", compression=self.sdk_compression)
131            self.verify_cluster_stats(self.servers[:self.nodes_init], max_verify=total_items)
132            if delete_percent is not None:
133                self.log.info("Start to delete %s %% total keys" % delete_percent)
134                doc_delete_gen = copy.deepcopy(self.json_delete_gen)
135                doc_delete_gen.end = int(self.num_items) * int(delete_percent) / 100
136                total_items -= doc_delete_gen.end
137                self.cluster.load_gen_docs(self.master, bucket.name, doc_delete_gen,
138                                                            bucket.kvs[1], "delete", compression=self.sdk_compression)
139            if updated and update_field is not None:
140                self.log.info("Start update data")
141                doc_updated_gen = copy.deepcopy(self.json_create_gen)
142                doc_updated_gen.update(fields_to_update=update_field)
143                self.cluster.load_gen_docs(self.master, bucket.name, doc_updated_gen,
144                                                             bucket.kvs[1], "update", compression=self.sdk_compression)
145            self.verify_cluster_stats(self.servers[:self.nodes_init], max_verify=total_items)
146            """ remove previous export directory at tmp dir and re-create it
147                in linux:   /tmp/export
148                in windows: /cygdrive/c/tmp/export """
149            self.log.info("remove old export dir in %s" % self.tmp_path)
150            self.shell.execute_command("rm -rf {0}export{1} "\
151                                       .format(self.tmp_path, self.master.ip))
152            self.log.info("create export dir in {0}".format(self.tmp_path))
153            self.shell.execute_command("mkdir {0}export{1}"\
154                                       .format(self.tmp_path, self.master.ip))
155            """ /opt/couchbase/bin/cbexport json -c localhost -u Administrator
156                              -p password -b default -f list -o /tmp/test4.zip """
157            export_file = self.ex_path + bucket.name
158            if self.cmd_ext:
159                export_file = export_file.replace("/cygdrive/c", "c:")
160            exe_cmd_str = "%s%s%s %s -c %s -u %s -p %s -b %s -f %s -o %s"\
161                         % (self.cli_command_path, "cbexport", self.cmd_ext, self.imex_type,
162                                     server.ip, username, password, bucket.name,
163                                                    self.format_type, export_file)
164            output, error = self.shell.execute_command(exe_cmd_str)
165            self._check_output("successfully", output)
166            if self.format_type == "list":
167                self.log.info("remove [] in file")
168                self.shell.execute_command("sed%s -i '/^\[\]/d' %s"
169                                                 % (self.cmd_ext,export_file))
170            output, _ = self.shell.execute_command("gawk%s 'END {print NR}' %s"
171                                                     % (self.cmd_ext, export_file))
172            self.assertTrue(int(total_items) == int(output[0]),
173                                     "doc in bucket: %s != doc in export file: %s"
174                                      % (total_items, output[0]))
175
176    def test_export_from_dgm_bucket(self):
177        """
178           Need to add params below to test:
179            format_type=list (or lines)
180            dgm_run=True
181            active_resident_threshold=xx
182        """
183        options = {"load_doc": True, "docs":"1000"}
184        return self._common_imex_test("export", options)
185
186    def test_export_with_secure_connection(self):
187        """
188           Need to add params below to test:
189            format_type=list (or lines)
190            secure-conn=True (default False)
191           Return: None
192        """
193        options = {"load_doc": True, "docs":"1000"}
194        return self._common_imex_test("export", options)
195
196    def test_import_to_dgm_bucket(self):
197        """
198           Need to add params below to test:
199            format_type=list (or lines)
200            imex_type=json (tab or csv)
201            import_file=json_list_1000_lines (depend on above formats)
202            dgm_run=True
203            active_resident_threshold=xx
204        """
205        options = {"load_doc": True, "docs":"1000"}
206        return self._common_imex_test("import", options)
207
208    def test_imex_with_special_chars_in_password(self):
209        """
210           Import and export must accept special characters in password
211           like: #, $
212           This test needs to reset password back to original one.
213           Need param:
214             @param password
215             @param test_type
216             @format_type lines/list
217             @param import_file
218        """
219        options = {"load_doc": True, "docs":"1000"}
220        new_password = self.input.param("password", None)
221        if "hash" in new_password:
222            new_password = new_password.replace("hash", "#")
223        if "bang" in new_password:
224            new_password = new_password.replace("bang", "!")
225        rest_password = self.master.rest_password
226        command = "setting-cluster"
227        password_changed = False
228        try:
229            self.log.info("Change password to new one")
230            if self.input.param("password", None) is None:
231                self.fail("Need to pass param 'password' to run this test")
232            options_cli = "-u Administrator -p %s --cluster-password '%s' "\
233                                        % (rest_password, new_password)
234            output, _ = self.shell.couchbase_cli(command, self.master.ip, options_cli)
235            if self._check_output("SUCCESS", output):
236                self.log.info("Password was changed to %s " % new_password)
237                password_changed = True
238                self.master.rest_password = new_password
239            else:
240                self.fail("Fail to change password cluster to %s" % new_password)
241
242            self._common_imex_test(self.test_type, options)
243        finally:
244            if password_changed:
245                self.log.info("change password back to default in ini file")
246                options = "-u Administrator -p '%s' --cluster-password '%s' "\
247                                           % (new_password, rest_password)
248                output, _ = self.shell.couchbase_cli(command, self.master.ip, options)
249                if self._check_output("SUCCESS", output):
250                    self.log.info("Password was changed back to %s " % rest_password)
251                    self.master.rest_password = rest_password
252                else:
253                    self.fail("Fail to change password back to %s" % rest_password)
254
255    def test_imex_during_rebalance(self):
256        """ During rebalance, the test will execute the import/export command.
257            Tests will execute in both path, absolute and bin path.
258        """
259        self._load_doc_data_all_buckets()
260        task_reb = self.cluster.async_rebalance(self.servers, [self.servers[2]], [])
261        while task_reb.state != "FINISHED":
262            if len(self.buckets) >= 1:
263                for bucket in self.buckets:
264                    if self.test_type == "import":
265                        self.test_type = "cbimport"
266                        self._remote_copy_import_file(self.import_file)
267                        format_flag = "-f"
268                        field_separator_flag = ""
269                        if self.imex_type == "csv":
270                            format_flag = self.format_type = ""
271                            if self.field_separator != "comma":
272                                if self.field_separator == "tab":
273                                    """ we test tab separator in this case """
274                                    field_separator_flag = "--field-separator $'\\t' "
275                                else:
276                                    field_separator_flag = "--field-separator %s "\
277                                                              % self.field_separator
278                        key_gen = "key::%index%"
279                        if self.cmd_ext:
280                            self.des_file = self.des_file.replace("/cygdrive/c", "c:")
281                        imp_cmd_str = "%s%s%s %s -c %s -u Administrator -p password"\
282                                                    " -b %s -d %s%s %s %s -g %s %s "\
283                              % (self.cli_command_path, self.test_type, self.cmd_ext,
284                                     self.imex_type, self.servers[0].ip, bucket.name,
285                                                   self.import_method, self.des_file,
286                                              format_flag, self.format_type, key_gen,
287                                                                field_separator_flag)
288                        output, error = self.shell.execute_command(imp_cmd_str)
289                        self.log.info("Output from execute command %s " % output)
290                        if not self._check_output("successfully", output):
291                            self.fail("Fail to import json file")
292                    elif self.test_type == "export":
293                        self.test_type = "cbexport"
294                        self.shell.execute_command("rm -rf {0}export{1}"\
295                                                   .format(self.tmp_path,self.master.ip))
296                        self.shell.execute_command("mkdir {0}export{1}"\
297                                                   .format(self.tmp_path, self.master.ip))
298                        export_file = self.ex_path + bucket.name
299                        if self.imex_type == "json":
300                            if self.cmd_ext:
301                                export_file = export_file.replace("/cygdrive/c", "c:")
302                            exp_cmd_str = "%s%s%s %s -c %s -u Administrator -p password"\
303                                                            " -b %s -f %s -o %s"\
304                                  % (self.cli_command_path, self.test_type, self.cmd_ext,
305                                         self.imex_type, self.servers[0].ip, bucket.name,
306                                                               self.format_type, export_file)
307                            output, error = self.shell.execute_command(exp_cmd_str)
308                            self.log.info("Output from execute command %s " % output)
309                            if not self._check_output("successfully", output):
310                                self.fail("Fail to import json file")
311        task_reb.result()
312
313    def test_imex_non_default_port(self):
314        options = {"load_doc": True, "docs":"10"}
315        server = copy.deepcopy(self.servers[0])
316        import_method = self.input.param("import_method", "file://")
317        default_port = 8091
318        new_port = 9000
319        port_changed = False
320        test_failed = False
321        try:
322            """ change default port from 8091 to 9000 """
323            port_cmd = "%s%s%s %s -c %s:%s -u Administrator -p password --cluster-port=%s "\
324                                    % (self.cli_command_path, "couchbase-cli", self.cmd_ext,
325                                          "setting-cluster", server.ip, default_port, new_port)
326            output, error = self.shell.execute_command(port_cmd)
327            if self._check_output("SUCCESS", output):
328                self.log.info("Port was changed from 8091 to 9000")
329                port_changed = True
330            else:
331                self.fail("Fail to change port 8091 to 9000")
332            if self.test_type == "import":
333                self.test_type = "cbimport"
334                self._remote_copy_import_file(self.import_file)
335                if len(self.buckets) >= 1:
336                    if self.imex_type == "json":
337                        for bucket in self.buckets:
338                            key_gen = "key::%index%"
339                            """ ./cbimport json -c 12.11.10.132 -u Administrator -p password
340                        -b default -d file:///tmp/export/default -f list -g key::%index%  """
341                            imp_cmd_str = "%s%s%s %s -c %s:%s -u Administrator -p password "\
342                                                                 "-b %s -d %s%s -f %s -g %s"\
343                                     % (self.cli_command_path, self.test_type, self.cmd_ext,
344                                           self.imex_type, server.ip, new_port, bucket.name,
345                                    import_method, self.des_file, self.format_type, key_gen)
346                            output, error = self.shell.execute_command(imp_cmd_str)
347                            self.log.info("Output from execute command %s " % output)
348            elif self.test_type == "export":
349                self.test_type = "cbexport"
350                if len(self.buckets) >= 1:
351                    for bucket in self.buckets:
352                        self.log.info("load json to bucket %s " % bucket.name)
353                        load_cmd = "%s%s%s -n %s:%s -u Administrator -p password "\
354                                                                 "-j -i %s -b %s "\
355                            % (self.cli_command_path, "cbworkloadgen", self.cmd_ext,
356                               server.ip, new_port, options["docs"], bucket.name)
357                        self.shell.execute_command(load_cmd)
358                self.shell.execute_command("rm -rf {0}export{1}"\
359                                           .format(self.tmp_path, self.master.ip))
360                self.shell.execute_command("mkdir {0}export{1}"\
361                                           .format(self.tmp_path,self.master.ip))
362                """ /opt/couchbase/bin/cbexport json -c localhost -u Administrator
363                              -p password -b default -f list -o /tmp/test4.zip """
364                if len(self.buckets) >= 1:
365                    for bucket in self.buckets:
366                        export_file = self.ex_path + bucket.name
367                        if self.cmd_ext:
368                            export_file = export_file.replace("/cygdrive/c", "c:")
369                        exe_cmd_str = "%s%s%s %s -c %s:%s -u Administrator "\
370                                             "-p password -b %s -f %s -o %s"\
371                                    % (self.cli_command_path, self.test_type,
372                                       self.cmd_ext, self.imex_type, server.ip,
373                                       new_port, bucket.name, self.format_type,
374                                                                   export_file)
375                        self.shell.execute_command(exe_cmd_str)
376                        self._verify_export_file(bucket.name, options)
377        except Exception, e:
378            if e:
379                print "Exception throw: ", e
380            test_failed = True
381        finally:
382            if port_changed:
383                self.log.info("change port back to default port 8091")
384                port_cmd = "%s%s%s %s -c %s:%s -u Administrator -p password --cluster-port=%s "\
385                        % (self.cli_command_path, "couchbase-cli", self.cmd_ext,
386                           "setting-cluster", server.ip, new_port, default_port)
387                output, error = self.shell.execute_command(port_cmd)
388            if test_failed:
389                self.fail("Test failed.  Check exception throw above.")
390
391    def test_imex_flags(self):
392        """ imex_type     = json
393            cluster_flag  = -c
394            user_flag     = -u
395            password_flag = -p
396            bucket_flag   = -b
397            dataset_flag  = -d  // only import
398            format_flag   = -f
399            generate_flag = -g  // only import
400            output_flag   = -o  // only export
401            format_type = list/lines
402            import_file = json_list_1000_lines
403                                  =lines,....
404            ./cbimport json -c 12.11.10.132 -u Administrator -p password
405                -b default -d file:///tmp/export/default -f list -g key::%index% """
406        server = copy.deepcopy(self.servers[0])
407        self.sample_file = self.input.param("sample_file", None)
408        self.cluster_flag = self.input.param("cluster_flag", "-c")
409        self.user_flag = self.input.param("user_flag", "-u")
410        self.password_flag = self.input.param("password_flag", "-p")
411        self.bucket_flag = self.input.param("bucket_flag", "-b")
412        self.dataset_flag = self.input.param("dataset_flag", "-d")
413        self.format_flag = self.input.param("format_flag", "-f")
414        self.generate_flag = self.input.param("generate_flag", "-g")
415        self.output_flag = self.input.param("output_flag", "-o")
416        data_path = self.tmp_path
417        if self.cmd_ext:
418            data_path = self.tmp_path_raw
419        if self.test_type == "import":
420            cmd = "cbimport"
421            cmd_str = "%s%s%s %s %s %s %s Administrator %s password %s default %s "\
422                                     "file://%sdefault  %s lines %s key::%%index%%"\
423                            % (self.cli_command_path, cmd, self.cmd_ext,
424                           self.imex_type, self.cluster_flag, server.ip,
425                           self.user_flag, self.password_flag, self.bucket_flag,
426                           self.dataset_flag, data_path, self.format_flag,
427                           self.generate_flag)
428        elif self.test_type == "export":
429            cmd = "cbexport"
430            self.shell.execute_command("rm -rf {0}export{1}"\
431                                       .format(self.tmp_path, self.master.ip))
432            self.shell.execute_command("mkdir {0}export{1}" \
433                                       .format(self.tmp_path, self.master.ip))
434            export_file = self.ex_path + "default"
435            cmd_str = "%s%s%s %s %s %s %s Administrator %s password %s default "\
436                                                             "  %s lines %s %s "\
437                            % (self.cli_command_path, cmd, self.cmd_ext,
438                           self.imex_type, self.cluster_flag, server.ip,
439                           self.user_flag, self.password_flag, self.bucket_flag,
440                           self.format_flag, self.output_flag, export_file)
441        output, error = self.shell.execute_command(cmd_str)
442        if self.imex_type == "":
443            if "Unknown flag: -c" in output:
444                self.log.info("%s detects missing 'json' option " % self.test_type)
445            else:
446                self.fail("%s could not detect missing 'json' option"
447                                                             % self.test_type)
448        if self.cluster_flag == "":
449            if "Invalid subcommand `%s`" % server.ip in output \
450                                  and "Required Flags:" in output:
451                self.log.info("%s detected missing '-c or --clusger' flag"
452                                                             % self.test_type)
453            else:
454                self.fail("%s could not detect missing '-c or --cluster' flag"
455                                                              % self.test_type)
456        if self.user_flag == "":
457            if "Expected flag: Administrator" in output \
458                             and "Required Flags:" in output:
459                self.log.info("%s detected missing '-u or --username' flag"
460                                                          % self.test_type)
461            else:
462                self.fail("%s could not detect missing '-u or --username' flag"
463                                                              % self.test_type)
464        if self.password_flag == "":
465            if "Expected flag: password" in output \
466                             and "Required Flags:" in output:
467                self.log.info("%s detected missing '-p or --password' flag"
468                                                          % self.test_type)
469            else:
470                self.fail("%s could not detect missing '-p or --password' flag"
471                                                          % self.test_type)
472        if self.bucket_flag == "":
473            if "Expected flag: default" in output \
474                             and "Required Flags:" in output:
475                self.log.info("%s detected missing '-b or --bucket' flag"
476                                                          % self.test_type)
477            else:
478                self.fail("%s could not detect missing '-b or --bucket' flag"
479                                                          % self.test_type)
480        if self.dataset_flag == "" and self.test_type == "import":
481            if "Expected flag: file://%sdefault" % data_path in output \
482                             and "Required Flags:" in output:
483                self.log.info("%s detected missing '-d or --dataset' flag"
484                                                          % self.test_type)
485            else:
486                self.fail("%s could not detect missing '-d or --dataset' flag"
487                                                          % self.test_type)
488        if self.format_flag == "":
489            if "Expected flag: lines" in output \
490                                 and "Required Flags:" in output:
491                self.log.info("%s detected missing '-f or --format' flag"
492                                                          % self.test_type)
493            else:
494                self.fail("%s could not detect missing '-f or --format' flag"
495                                                          % self.test_type)
496        if self.generate_flag == "" and self.test_type == "import":
497            if "Expected flag: %index%" in output \
498                             and "Required Flags:" in output:
499                self.log.info("%s detected missing '-g or --generate' flag"
500                                                          % self.test_type)
501            else:
502                self.fail("%s could not detect missing '-g or --generate' flag"
503                                                          % self.test_type)
504        if self.output_flag == "" and self.test_type == "export":
505            if "Expected flag: /tmp/export/default" in output \
506                             and "Required Flags:" in output:
507                self.log.info("%s detected missing '-o or --output' flag"
508                                                          % self.test_type)
509            else:
510                self.fail("%s could not detect missing '-o or --output' flag"
511                                                          % self.test_type)
512        self.log.info("Output from execute command %s " % output)
513
514    def test_imex_optional_flags(self):
515        """ imex_type     = json
516            threads_flag   = -t
517            errors_flag    = -e
518            logs_flag      = -l
519            include_key_flag = --include-key """
520        server = copy.deepcopy(self.servers[0])
521        self.threads_flag = self.input.param("threads_flag", "")
522        self.logs_flag = self.input.param("logs_flag", "")
523        self.include_key_flag = self.input.param("include_key_flag", "")
524        self.import_file = self.input.param("import_file", None)
525        self.errors_flag = self.input.param("errors_flag", "")
526        import_method = self.input.param("import_method", "file://")
527        self.output_flag = self.input.param("output_flag", "-o")
528        threads_flag = ""
529        if self.threads_flag != "":
530            threads_flag = "-t"
531            if self.threads_flag == "empty":
532                self.threads_flag = ""
533        errors_flag = ""
534        errors_path = ""
535        if self.errors_flag != "":
536            errors_flag = "-e"
537            self.shell.execute_command("rm -rf %serrors" % self.tmp_path)
538            self.shell.execute_command("mkdir %serrors" % self.tmp_path)
539            if self.errors_flag == "empty":
540                errors_path = ""
541            elif self.errors_flag == "error":
542                errors_path = self.errors_flag
543                if "; ./" in self.cli_command_path:
544                    self.shell.execute_command("rm -rf %serror"
545                                % self.cli_command_path.replace("; ./", ""))
546                else:
547                    self.shell.execute_command("rm -rf %serror"
548                                                   % self.cli_command_path)
549            elif self.errors_flag == "relative_path":
550                if self.os == 'windows':
551                    self.log.info("skip relative path test for -e flag on windows")
552                    return
553                errors_path = "~/error"
554                self.shell.execute_command("rm -rf ~/error")
555            elif self.errors_flag == "absolute_path":
556                errors_path = self.tmp_path_raw + "errors/" + self.errors_flag
557        logs_flag = ""
558        logs_path = ""
559        if self.logs_flag != "":
560            logs_flag = "-l"
561            self.shell.execute_command("rm -rf %slogs" % self.tmp_path)
562            self.shell.execute_command("mkdir %slogs" % self.tmp_path)
563            if self.logs_flag == "empty":
564                logs_path = ""
565            elif self.logs_flag == "log":
566                logs_path = self.logs_flag
567                if "; ./" in self.cli_command_path:
568                    self.shell.execute_command("rm -rf %slog"
569                                % self.cli_command_path.replace("; ./", ""))
570                else:
571                    self.shell.execute_command("rm -rf %slog"
572                                                    % self.cli_command_path)
573            elif self.logs_flag == "relative_path":
574                if self.os == 'windows':
575                    self.log.info("skip relative path test for -l flag on windows")
576                    return
577                logs_path = "~/log"
578                self.shell.execute_command("rm -rf ~/log")
579            elif self.logs_flag == "absolute_path":
580                logs_path = self.tmp_path_raw + "logs/" + self.logs_flag
581        if self.cmd_ext:
582            if logs_path and logs_path.startswith("/cygdrive/"):
583                logs_path = logs_path.replace("/cygdrive/c", "c:")
584        if self.test_type == "import":
585            cmd = "cbimport"
586            self._remote_copy_import_file(self.import_file)
587            if self.imex_type == "json":
588                for bucket in self.buckets:
589                    """ ./cbimport json -c 12.11.10.132 -u Administrator -p password
590                        -b default -d file:///tmp/export/default -f list -g %index%  """
591                    des_file = self.des_file
592                    if "/cygdrive/c" in des_file:
593                        des_file = des_file.replace("/cygdrive/c", "c:")
594                    imp_cmd_str = "%s%s%s %s -c %s -u Administrator -p password -b %s "\
595                                  "-d %s%s -f %s -g key::%%index%% %s %s %s %s %s %s"\
596                             % (self.cli_command_path, cmd, self.cmd_ext,
597                                self.imex_type, server.ip, bucket.name,
598                                import_method,
599                                des_file,
600                                self.format_type, threads_flag,
601                                self.threads_flag,
602                                errors_flag, errors_path,
603                                logs_flag, logs_path)
604                    self.log.info("command to run %s " % imp_cmd_str)
605                    output, error = self.shell.execute_command(imp_cmd_str)
606                    self.log.info("Output from execute command %s " % output)
607                    error_check = self._check_output_option_flags(output,
608                                                  errors_path, logs_path)
609                    if logs_path:
610                        self.shell.execute_command("rm -rf %s" % logs_path)
611                    if errors_path:
612                        self.shell.execute_command("rm -rf %s" % errors_path)
613                    if error_check and not self._check_output("successfully", output):
614                        self.fail("failed to run optional flags")
615        elif self.test_type == "export":
616            cmd = "cbexport"
617            include_flag = ""
618            if self.include_key_flag:
619                include_flag = " --include-key"
620            self.shell.execute_command("rm -rf {0}export{1}"\
621                                       .format(self.tmp_path, self.master.ip))
622            self.shell.execute_command("mkdir {0}export{1}"\
623                                       .format(self.tmp_path, self.master.ip))
624            if self.imex_type == "json":
625                for bucket in self.buckets:
626                    self.log.info("load json to bucket %s " % bucket.name)
627                    load_cmd = "%s%s%s -n %s:8091 -u Administrator -p password -j "\
628                                                                     "-i %s -b %s "\
629                            % (self.cli_command_path, "cbworkloadgen", self.cmd_ext,
630                                             server.ip, self.num_items, bucket.name)
631                    self.shell.execute_command(load_cmd)
632                    export_file = self.ex_path + bucket.name
633                    export_file_cmd = export_file
634                    if "/cygdrive/c" in export_file_cmd:
635                        export_file_cmd = export_file_cmd.replace("/cygdrive/c", "c:")
636                    cmd_str = "%s%s%s %s -c %s -u Administrator -p password -b %s "\
637                                    "  -f %s %s %s %s %s %s %s %s %s"\
638                                     % (self.cli_command_path, cmd, self.cmd_ext,
639                                        self.imex_type, server.ip, bucket.name,
640                                        self.format_type, self.output_flag,
641                                        export_file_cmd,
642                                        threads_flag, self.threads_flag,
643                                        logs_flag, logs_path,
644                                        include_flag, self.include_key_flag)
645                    output, error = self.shell.execute_command(cmd_str)
646                    self.log.info("Output from execute command %s " % output)
647                    error_check = self._check_output_option_flags(output,
648                                                              errors_path, logs_path)
649                    if logs_path:
650                        self.shell.execute_command("rm -rf %s" % logs_path)
651                    if error_check and not self._check_output("successfully", output):
652                        self.fail("failed to run optional flags")
653                    if self.include_key_flag:
654                        self.log.info("Verify export with --include-key flag")
655                        output, _ = self.shell.execute_command("cat %s" % export_file)
656                        if output:
657                            for x in output:
658                                eval_x = ast.literal_eval(x)
659                                if not eval_x[self.include_key_flag].startswith("pymc"):
660                                    self.fail("Flag %s failed to include key "
661                                              % include_flag)
662                                else:
663                                    self.log.info("Data for %s flag is verified"
664                                                  % include_flag)
665
666    def test_import_invalid_folder_structure(self):
667        """ not in 4.6 """
668        options = {"load_doc": False}
669        return self._common_imex_test("import", options)
670
671    """ /opt/couchbase/bin/cbimport json -c 12.11.10.130:8091
672       -u Administrator -p password  -b travel-sample
673       -d /opt/couchbase/samples/travel-sample.zip -f sample """
674    def test_import_invalid_json_sample(self):
675        options = {"load_doc": False}
676        return self._common_imex_test("import", options)
677
678    def test_import_json_sample(self):
679        """ test_import_json_sample
680           -p default_bucket=False,imex_type=json,sample_file=travel-sample """
681        username = self.input.param("username", None)
682        password = self.input.param("password", None)
683        self.sample_file = self.input.param("sample_file", None)
684        self.imex_type = self.input.param("imex_type", None)
685        sample_file_path = self.sample_files_path + self.sample_file + ".zip"
686        server = copy.deepcopy(self.servers[0])
687        if username is None:
688            username = server.rest_username
689        if password is None:
690            password = server.rest_password
691        if self.sample_file is not None:
692            cmd = "cbimport"
693            imp_cmd_str = "%s%s%s %s -c %s -u %s -p %s -b %s -d %s -f sample"\
694                             % (self.cli_command_path, cmd, self.cmd_ext, self.imex_type,
695                                         server.ip, username, password, self.sample_file,
696                                                                      sample_file_path)
697            output, error = self.shell.execute_command(imp_cmd_str)
698            if not self._check_output("SUCCESS", output):
699                self.log.info("Output from command %s" % output)
700                self.fail("Failed to load sample file %s" % self.sample_file)
701
702    """ imex_type=json,format_type=list,import_file=json_list_1000_lines
703                                  =lines,.... """
704    def test_import_json_file(self):
705        options = {"load_doc": False, "docs": "1000"}
706        self.import_file = self.input.param("import_file", None)
707        return self._common_imex_test("import", options)
708
709    def test_import_json_generate_keys(self):
710        options = {"load_doc": False, "docs": "1000"}
711        return self._common_imex_test("import", options)
712
713    def test_import_json_with_limit_n_docs(self):
714        options = {"load_doc": False, "docs": "1000"}
715        return self._common_imex_test("import", options)
716
717    def test_import_json_with_skip_n_docs(self):
718        """
719           import docs with option to skip n docs
720           flag --skip-docs
721           :return: None
722        """
723        options = {"load_doc": False, "docs": "1000"}
724        return self._common_imex_test("import", options)
725
726    def test_import_json_with_skip_limit_n_docs(self):
727        """
728           import docs with option skip n docs and
729           limit n docs to be imported
730        """
731        options = {"load_doc": False, "docs": "1000"}
732        return self._common_imex_test("import", options)
733
734    def test_import_csv_with_limit_n_rows(self):
735        """
736           import csv with option to limit number of rows
737           flag --limit-rows
738           :return: None
739        """
740        options = {"load_doc": False, "docs": "1000"}
741        return self._common_imex_test("import", options)
742
743    def test_import_csv_with_skip_n_rows(self):
744        """
745           import csv with option to limit number of rows
746           flag --skip-rows
747           :return: None
748        """
749        options = {"load_doc": False, "docs": "1000"}
750        return self._common_imex_test("import", options)
751
752    def test_import_csv_with_skip_limit_n_rows(self):
753        """
754           import csv with option skip n rows and
755           limit n rows
756           params:
757             imex_type=json,format_type=lines,import_file=json_1000_lines,
758           skip-docs=100,limit-docs=20,nodes_init=2,verify-data=True
759        :return: None
760        """
761        options = {"load_doc": False, "docs": "1000"}
762        return self._common_imex_test("import", options)
763
764    def test_import_csv_file(self):
765        options = {"load_doc": False, "docs": "1000"}
766        return self._common_imex_test("import", options)
767
768    def test_import_csv_with_infer_types(self):
769        options = {"load_doc": False, "docs": "1000"}
770        return self._common_imex_test("import", options)
771
772    def test_import_csv_with_omit_empty(self):
773        options = {"load_doc": False, "docs": "1000"}
774        return self._common_imex_test("import", options)
775
776    def test_import_with_secure_connection(self):
777        """
778            For csv, require param key-gen=False
779        """
780        options = {"load_doc": False, "docs": "1000"}
781        return self._common_imex_test("import", options)
782
783    def _common_imex_test(self, cmd, options):
784        username = self.input.param("username", None)
785        password = self.input.param("password", None)
786        if password:
787            if "hash" in password:
788                password = password.replace("hash", "#")
789            if "bang" in password:
790                password = password.replace("bang", "!")
791        path = self.input.param("path", None)
792        self.pre_imex_ops_keys = 0
793        self.short_flag = self.input.param("short_flag", True)
794        import_method = self.input.param("import_method", "file://")
795        if "url" in import_method:
796            import_method = ""
797        self.ex_path = self.tmp_path + "export{0}/".format(self.master.ip)
798        master = self.servers[0]
799        server = copy.deepcopy(master)
800
801        if username is None:
802            username = server.rest_username
803        if password is None:
804            password = server.rest_password
805        if path is None:
806            self.log.info("test with absolute path ")
807        elif path == "local":
808            self.log.info("test with local bin path ")
809            self.cli_command_path = "cd %s; ./" % self.cli_command_path
810        self.buckets = RestConnection(server).get_buckets()
811        res_status = ""
812        random_key = self.key_generator()
813        kv_gen = BlobGenerator(random_key, "%s-" % random_key,
814                                                   self.value_size,
815                                                   start=0,
816                                                   end=50000)
817        url_format = ""
818        secure_port = ""
819        secure_conn = ""
820        if self.secure_conn:
821            # bin_path, cert_path, user, password, server_cert
822            cacert = self.shell.get_cluster_certificate_info(self.cli_command_path,
823                                                             self.tmp_path_raw,
824                                                             "Administrator",
825                                                             "password",
826                                                             self.master)
827            secure_port = "1"
828            url_format = "s"
829            if not self.no_cacert:
830                secure_conn = "--cacert %s" % cacert
831            if self.no_ssl_verify:
832                secure_conn = "--no-ssl-verify"
833        if "export" in cmd:
834            cmd = "cbexport"
835            if options["load_doc"]:
836                if len(self.buckets) >= 1:
837                    for bucket in self.buckets:
838                        self.log.info("load json to bucket %s " % bucket.name)
839                        load_cmd = "%s%s%s -n %s:8091 -u %s -p '%s' -j -i %s -b %s "\
840                            % (self.cli_command_path, "cbworkloadgen", self.cmd_ext,
841                               server.ip, username, password, options["docs"],
842                               bucket.name)
843                        if self.dgm_run and self.active_resident_threshold:
844                            """ disable auto compaction so that bucket could
845                                go into dgm faster.
846                            """
847                            self.rest.disable_auto_compaction()
848                            self.log.info("**** Load bucket to %s of active resident"\
849                                          % self.active_resident_threshold)
850                            self.shell.execute_command("{0}{1}{2} bucket-edit -c {3}:8091 "\
851                                                       " -u Administrator -p password "\
852                                                       "--bucket {4} --bucket-ramsize 100"\
853                                                       .format(self.cli_command_path,
854                                                        "couchbase-cli", self.cmd_ext,
855                                                        server.ip, bucket.name))
856                            self._load_all_buckets(self.master, kv_gen, "create", 0)
857                        self.log.info("load sample data to bucket")
858                        self.shell.execute_command(load_cmd)
859            """ remove previous export directory at tmp dir and re-create it
860                in linux:   /tmp/export
861                in windows: /cygdrive/c/tmp/export """
862            self.log.info("remove old export dir in %s" % self.tmp_path)
863            self.shell.execute_command("rm -rf {0}export{1}"\
864                                       .format(self.tmp_path, self.master.ip))
865            self.log.info("create export dir in %s" % self.tmp_path)
866            self.shell.execute_command("mkdir {0}export{1}"\
867                                       .format(self.tmp_path, self.master.ip))
868            if self.check_preload_keys:
869                for bucket in self.buckets:
870                    self.cluster_helper.wait_for_stats([self.master], bucket.name, "",
871                                                                "ep_queue_size", "==", 0)
872                    self.pre_imex_ops_keys = \
873                            RestConnection(self.master).get_active_key_count(bucket.name)
874            """ /opt/couchbase/bin/cbexport json -c localhost -u Administrator
875                              -p password -b default -f list -o /tmp/test4.zip """
876            if len(self.buckets) >= 1:
877                for bucket in self.buckets:
878                    stats_all_buckets = {}
879                    stats_all_buckets[bucket.name] = StatsCommon()
880                    export_file = self.ex_path + bucket.name
881                    if self.cmd_ext:
882                        export_file = export_file.replace("/cygdrive/c", "c:")
883                    if self.localhost:
884                        server.ip = "localhost"
885                    exe_cmd_str = "%s%s%s %s -c http%s://%s:%s8091 -u %s -p '%s' " \
886                                  " -b %s -f %s %s -o %s -t 4"\
887                                  % (self.cli_command_path, cmd, self.cmd_ext,
888                                     self.imex_type, url_format, server.ip,
889                                     secure_port, username, password, bucket.name,
890                                     self.format_type, secure_conn, export_file)
891                    if self.dgm_run:
892                        res_status = stats_all_buckets[bucket.name].get_stats([self.master],
893                                     bucket, '', 'vb_active_perc_mem_resident')[self.master]
894                        while int(res_status) > self.active_resident_threshold:
895                            self.sleep(5)
896                            res_status = stats_all_buckets[bucket.name].get_stats([self.master],
897                                         bucket, '', 'vb_active_perc_mem_resident')[self.master]
898                        if int(res_status) <= self.active_resident_threshold:
899                            self.log.info("Clear terminal")
900                            self.shell.execute_command('printf "\033c"')
901                    output, error = self.shell.execute_command(exe_cmd_str)
902                    data_exported = True
903                    if self.secure_conn:
904                        if self.no_ssl_verify:
905                            if not self._check_output("successfully", output):
906                                data_exported = False
907                                self.fail("Fail to export with no-ssl-verify flag")
908                        elif self.no_cacert:
909                            if self._check_output("successfully", output):
910                                data_exported = False
911                                self.fail("Secure connection works without cacert")
912                        elif not self._check_output("successfully", output):
913                            data_exported = False
914                            self.fail("Failed export json in secure connection")
915                    elif not self._check_output("successfully", output):
916                        data_exported = False
917                        self.fail("Failed to export json data")
918                    if data_exported:
919                        self._verify_export_file(bucket.name, options)
920
921            if self.import_back:
922                import_file = export_file
923                import_servers = copy.deepcopy(self.servers)
924                imp_rest = RestConnection(import_servers[2])
925                import_shell = RemoteMachineShellConnection(import_servers[2])
926                imp_rest.force_eject_node()
927                self.sleep(2)
928
929                imp_rest = RestConnection(import_servers[2])
930                status = False
931                info = imp_rest.get_nodes_self()
932                if info.memoryQuota and int(info.memoryQuota) > 0:
933                    self.quota = info.memoryQuota
934                imp_rest.init_node()
935                self.cluster.rebalance(import_servers[2:], [import_servers[3]], [])
936
937                """ Add built-in user cbadminbucket to second cluster """
938                self.log.info("add built-in user cbadminbucket to second cluster.")
939                testuser = [{'id': 'cbadminbucket', 'name': 'cbadminbucket', 'password': 'password'}]
940                RbacBase().create_user_source(testuser, 'builtin', import_servers[2])
941                self.sleep(10)
942                """ Assign user to role """
943                role_list = [{'id': 'cbadminbucket', 'name': 'cbadminbucket', 'roles': 'admin'}]
944                RbacBase().add_user_role(role_list, RestConnection(import_servers[2]), 'builtin')
945                self.sleep(10)
946
947                bucket_params=self._create_bucket_params(server=import_servers[2],
948                                        size=250,
949                                        replicas=self.num_replicas,
950                                        enable_replica_index=self.enable_replica_index,
951                                        eviction_policy=self.eviction_policy)
952                self.cluster.create_default_bucket(bucket_params)
953                imp_cmd_str = "%s%s%s %s -c %s -u %s -p '%s' -b %s "\
954                                        "-d file://%s -f %s -g key::%%%s%%"\
955                                        % (self.cli_command_path, "cbimport",
956                                           self.cmd_ext, self.imex_type,
957                                           import_servers[2].ip, username, password,
958                                           "default", import_file, self.format_type,
959                                           "index")
960                output, error = import_shell.execute_command(imp_cmd_str)
961                if self._check_output("error", output):
962                    self.fail("Fail to run import back to bucket")
963        elif "import" in cmd:
964            cmd = "cbimport"
965            if import_method != "":
966                self.im_path = self.tmp_path + "import/"
967                self.log.info("copy import file from local to remote")
968                skip_lines = ""
969                if self.skip_docs:
970                    skip_lines = " --skip-docs %s " % self.skip_docs
971                limit_lines = ""
972                if self.limit_docs:
973                    limit_lines = " --limit-docs %s " % self.limit_docs
974                if self.limit_rows:
975                    limit_lines = " --limit-rows %s " % self.limit_rows
976                if self.skip_rows:
977                    skip_lines = " --skip-rows %s " % self.skip_rows
978                omit_empty = ""
979                if self.omit_empty:
980                    omit_empty = " --omit-empty "
981                infer_types = ""
982                if self.infer_types:
983                    infer_types = " --infer-types "
984                json_invalid_errors_file = ""
985                if self.json_invalid_errors:
986                    self.log.info("Remove old json invalid error file")
987                    json_invalid_errors_file = "-e %sinvalid_error" % self.tmp_path
988                    self.shell.execute_command("rm -rf %s"
989                                                     % json_invalid_errors_file[3:])
990                fx_generator = ""
991                if self.fx_generator:
992                    fx_generator = "::#%s#" % self.fx_generator.upper()
993                if self.fx_generator and self.fx_gen_start:
994                    fx_generator = "::#%s[%s]#" \
995                                    % (self.fx_generator.upper(), self.fx_gen_start)
996                output, error = self.shell.execute_command("ls %s " % self.tmp_path)
997                if self._check_output("import", output):
998                    self.log.info("remove %simport directory" % self.tmp_path)
999                    self.shell.execute_command("rm -rf  %simport " % self.tmp_path)
1000                    output, error = self.shell.execute_command("ls %s " \
1001                                                                   % self.tmp_path)
1002                    if self._check_output("import", output):
1003                        self.fail("fail to delete import dir ")
1004                self.shell.execute_command("mkdir  %simport " % self.tmp_path)
1005                if self.import_file is not None:
1006                    src_file = "resources/imex/"+ self.import_file
1007                else:
1008                    self.fail("Need import_file param")
1009                des_file = self.im_path + self.import_file
1010                self.shell.copy_file_local_to_remote(src_file, des_file)
1011            else:
1012                des_file = self.import_file
1013
1014            if len(self.buckets) >= 1:
1015                format_flag = "-f"
1016                field_separator_flag = ''
1017                if self.imex_type == "csv":
1018                    format_flag = ""
1019                    self.format_type = ""
1020                    if self.field_separator != "comma":
1021                        if self.field_separator == "tab":
1022                            """ we test tab separator in this case """
1023                            field_separator_flag = "--field-separator $'\\t' "
1024                        else:
1025                            field_separator_flag = "--field-separator %s " \
1026                                                            % self.field_separator
1027                for bucket in self.buckets:
1028                    key_gen = "-g %index%"
1029                    if self.key_gen:
1030                        key_gen = "-g key::%index%"
1031                    if self.field_substitutions:
1032                        key_gen = "-g key::%{0}%".format(self.field_substitutions)
1033                        options["field_substitutions"] = key_gen[3:]
1034                    """ ./cbimport json -c 12.11.10.132 -u Administrator -p password
1035                        -b default -d file:///tmp/export/default -f list -g key::%index%
1036                    """
1037                    if self.cmd_ext:
1038                        des_file = des_file.replace("/cygdrive/c", "c:")
1039                    imp_cmd_str = "%s%s%s %s -c http%s://%s:%s8091 -u %s -p '%s' " \
1040                                  "-b %s -d %s%s %s %s "\
1041                                  " %s%s %s %s %s %s %s %s %s"\
1042                                       % (self.cli_command_path, cmd, self.cmd_ext,
1043                                          self.imex_type,
1044                                          url_format, server.ip, secure_port,
1045                                          username, password,
1046                                          bucket.name,
1047                                          import_method, des_file,
1048                                          format_flag,
1049                                          self.format_type,
1050                                          key_gen, fx_generator,
1051                                          field_separator_flag,
1052                                          limit_lines, skip_lines,
1053                                          omit_empty, infer_types,
1054                                          secure_conn, json_invalid_errors_file)
1055                    print "\ncommand format: ", imp_cmd_str
1056                    if self.dgm_run and self.active_resident_threshold:
1057                        """ disable auto compaction so that bucket could
1058                            go into dgm faster.
1059                        """
1060                        RestConnection(self.master).disable_auto_compaction()
1061                        self.log.info("**** Load bucket to %s of active resident"\
1062                                          % self.active_resident_threshold)
1063                        self.shell.execute_command("{0}{1}{2} bucket-edit -c {3}:8091 "\
1064                                                       " -u Administrator -p password "\
1065                                                       "--bucket {4} --bucket-ramsize 100"\
1066                                                       .format(self.cli_command_path,
1067                                                        "couchbase-cli", self.cmd_ext,
1068                                                        server.ip, bucket.name))
1069                        self._load_all_buckets(self.master, kv_gen, "create", 0)
1070                    self.cluster_helper.wait_for_stats([self.master], bucket.name, "",
1071                                                             "ep_queue_size", "==", 0)
1072                    if self.check_preload_keys:
1073                        self.cluster_helper.wait_for_stats([self.master], bucket.name, "",
1074                                                                 "ep_queue_size", "==", 0)
1075                        self.pre_imex_ops_keys = \
1076                            RestConnection(self.master).get_active_key_count(bucket.name)
1077
1078                    self.log.info("Import data to bucket")
1079                    output, error = self.shell.execute_command(imp_cmd_str)
1080                    if error:
1081                        self.fail("\nFailed to run command %s\nError:\n %s"
1082                                  % (imp_cmd_str, error))
1083                    self.log.info("Output from execute command %s " % output)
1084                    """ Json `file:///root/json_list` imported to `http://host:8091`
1085                        successfully
1086                    """
1087
1088                    data_loaded = False
1089                    if self.secure_conn:
1090                        if self.no_ssl_verify:
1091                            if self._check_output("successfully", output):
1092                                data_loaded = True
1093                            else:
1094                                self.fail("Fail to import with no-ssl-verify flag")
1095                        elif self.no_cacert:
1096                            if self._check_output("successfully", output):
1097                                self.fail("Secure connection works without cacert")
1098                        elif self._check_output("successfully", output):
1099                            data_loaded = True
1100                        else:
1101                            self.fail("Failed import data in secure connection")
1102                    elif "invalid" in self.import_file:
1103                        if self.json_invalid_errors:
1104                            output1, error1 = self.shell.execute_command("cat %s"
1105                                                    % json_invalid_errors_file[3:])
1106                            if output1:
1107                                self.log.info("\n** Invalid json line in error file **\n"
1108                                              "=> %s" % output1)
1109                                if '"name":: "pymc272"' not in output1[0]:
1110                                    self.fail("Failed to write error json line to file")
1111                    elif self._check_output("successfully", output):
1112                        data_loaded = True
1113
1114                    if not data_loaded:
1115                        if self.secure_conn and not self.no_cacert:
1116                           self.fail("Failed to execute command")
1117                    self.sleep(5)
1118                    if data_loaded:
1119                        self._verify_import_data(options)
1120
1121    def _verify_import_data(self, options):
1122        self.buckets = RestConnection(self.master).get_buckets()
1123        for bucket in self.buckets:
1124            keys = RestConnection(self.master).get_active_key_count(bucket.name)
1125        skip_lines = 0
1126        if self.import_file and self.import_file.startswith("csv"):
1127            skip_lines = 1
1128        limit_lines = ""
1129        data_import = ""
1130        if "docs" in options:
1131            data_import = int(options["docs"])
1132        if self.skip_docs:
1133            data_import = int(options["docs"]) - int(self.skip_docs)
1134            skip_lines += int(self.skip_docs)
1135        if self.limit_docs:
1136            data_import = int(self.limit_docs)
1137            limit_lines = int(self.limit_docs)
1138        if self.skip_rows:
1139            data_import = int(options["docs"]) - int(self.skip_rows)
1140            skip_lines += int(self.skip_rows)
1141        if self.limit_rows:
1142            data_import = int(self.limit_rows)
1143            limit_lines = int(self.limit_rows)
1144        if self.dgm_run:
1145            keys = int(keys) - int(self.pre_imex_ops_keys)
1146
1147        if not self.json_invalid_errors:
1148            print "Total docs in bucket: ", keys
1149            print "Docs need to import: ", data_import
1150
1151        if data_import != int(keys):
1152            if self.skip_docs:
1153                self.fail("Import failed to skip %s docs" % self.skip_docs)
1154            if self.limit_docs:
1155                self.fail("Import failed to limit %s docs" % self.limit_docs)
1156            if self.limit_rows:
1157                self.fail("Import failed to limit %s rows" % self.limit_rows)
1158            if self.skip_rows:
1159                self.fail("Import failed to skip %s rows" % self.limit_rows)
1160            else:
1161                self.fail("Import data does not match with bucket data")
1162
1163        if self.verify_data:
1164            if self.field_substitutions:
1165                self.log.info("Check key format %s ")
1166                self.keys_check = []
1167                for i in range(10):
1168                    self.keys_check.append(self.rest.get_random_key("default"))
1169                if self.debug_logs:
1170                    print "keys:   ", self.keys_check
1171                for x in self.keys_check:
1172                    if self.field_substitutions == "age":
1173                      if not x["key"].split("::")[1].isdigit():
1174                          self.fail("Field substitutions failed to work")
1175                    if self.field_substitutions == "name":
1176                        if not x["key"].split("::")[1].startswith("pymc"):
1177                            self.fail("Field substitutions failed to work")
1178
1179            if self.imex_type == "json":
1180                export_file = self.tmp_path + "bucket_data"
1181                export_file_cmd = self.tmp_path_raw + "bucket_data"
1182                self.shell.execute_command("rm -rf %s " % export_file)
1183                cmd = "%scbexport%s %s -c %s -u %s -p '%s' -b %s -f %s -o %s"\
1184                              % (self.cli_command_path, self.cmd_ext,
1185                                 self.imex_type,
1186                                 self.master.ip, "cbadminbucket", "password",
1187                                 "default", self.format_type, export_file_cmd)
1188                output, error = self.shell.execute_command(cmd)
1189                self.shell.log_command_output(output, error)
1190            format_type = "json"
1191            if self.imex_type == "csv":
1192                if self.field_separator == "comma":
1193                    format_type = "csv_comma"
1194                else:
1195                    format_type = "csv_tab"
1196            with open("resources/imex/%s" % self.import_file) as f:
1197                if self.skip_docs and self.limit_docs:
1198                    self.log.info("Skip %s docs and import only %s docs after that"
1199                                               % (self.skip_docs, self.limit_docs))
1200                    src_data = list(itertools.islice(f, self.skip_docs,
1201                                                        skip_lines +
1202                                                        int(self.limit_docs)))
1203                    src_data = map(lambda s: s.strip(), src_data)
1204                    src_data = [x.replace(" ", "") for x in src_data]
1205                elif self.skip_rows and self.limit_rows:
1206                    self.log.info("Skip %s rows and import only %s rows after that"
1207                                  % (self.skip_rows, self.limit_rows))
1208                    src_data = list(itertools.islice(f, (self.skip_rows + 1),
1209                                                        skip_lines +
1210                                                        int(self.limit_rows)))
1211                elif self.skip_docs:
1212                    self.log.info("Get data from %dth line" % skip_lines)
1213                    src_data = f.read().splitlines()[skip_lines:]
1214                elif self.limit_docs:
1215                    self.log.info("Get limit data to %d lines" % limit_lines)
1216                    src_data = f.read().splitlines()[:limit_lines]
1217                elif self.limit_rows:
1218                    actual_lines = limit_lines + 1
1219                    self.log.info("Get limit data to %d lines" % limit_lines)
1220                    src_data = f.read().splitlines()[1:actual_lines]
1221                elif self.skip_rows:
1222                    self.log.info("Get data from %dth lines" % skip_lines)
1223                    src_data = f.read().splitlines()[skip_lines:]
1224                else:
1225                    self.log.info("Get data from source file")
1226                    src_data = f.read().splitlines()[skip_lines:]
1227            src_data = [x.replace(" ", "") for x in src_data]
1228            src_data = [x.rstrip(",") for x in src_data]
1229            src_data[0] = src_data[0].replace("[", "")
1230            src_data[len(src_data)-1] = src_data[len(src_data)-1].replace("]", "")
1231
1232            if self.imex_type == "json":
1233                self.log.info("Copy bucket data from remote to local")
1234                if os.path.exists("/tmp/%s" % self.master.ip):
1235                    shutil.rmtree("/tmp/%s" % self.master.ip)
1236                os.makedirs("/tmp/%s" % self.master.ip)
1237                self.shell.copy_file_remote_to_local(export_file,
1238                                              "/tmp/%s/bucket_data" % self.master.ip)
1239                with open("/tmp/%s/bucket_data" % self.master.ip) as f:
1240                    bucket_data = f.read().splitlines()
1241                    bucket_data = [x.replace(" ", "") for x in bucket_data]
1242                    bucket_data = [x.rstrip(",") for x in bucket_data]
1243                    bucket_data[0] = bucket_data[0].replace("[", "")
1244                    bucket_data[len(bucket_data) - 1] = \
1245                    bucket_data[len(bucket_data) - 1].replace("]", "")
1246                if self.debug_logs:
1247                    print "\nsource data  \n", src_data
1248                    print "\nbucket data  \n", bucket_data
1249                self.log.info("Compare source data and bucket data")
1250                if sorted(src_data) == sorted(bucket_data):
1251                    self.log.info("Import data match bucket data")
1252                    if os.path.exists("/tmp/%s" % self.master.ip):
1253                        self.log.info("Remove data in slave")
1254                        shutil.rmtree("/tmp/%s" % self.master.ip)
1255                else:
1256                    self.fail("Import data does not match bucket data")
1257            elif self.imex_type == "csv":
1258                self.log.info("Verify csv import data")
1259                shell = RemoteMachineShellConnection(self.master)
1260                curl_cmd = "curl -g -X GET -u Administrator:password " \
1261                      "http://%s:8091/pools/default/buckets/default/docs?" \
1262                      "include_docs=false&skip=0" % self.master.ip
1263                output, error = shell.execute_command(curl_cmd)
1264
1265                bucket_keys = ast.literal_eval(output[0])
1266                bucket_keys = bucket_keys["rows"]
1267                for x in range(0, len(src_data)):
1268                    if self.debug_logs:
1269                        print "source data:  ", src_data[x].split(",")[2]
1270                        print "bucket data:  \n", bucket_keys[x]["id"]
1271                    if not any(str(src_data[x].split(",")[2]) in\
1272                                                    k["id"] for k in bucket_keys):
1273                        self.fail("Failed to import key %s to bucket"
1274                                  % src_data[x])
1275                    curl_cmd = "curl -g -X GET -u Administrator:password " \
1276                            "http://%s:8091/pools/default/buckets/default/docs/%d" \
1277                               % (self.master.ip, x)
1278                    if self.omit_empty:
1279                        empty_data_keys = [2, 6, 100, 500, 750, 888]
1280                        if x in empty_data_keys:
1281                            output, error = shell.execute_command(curl_cmd)
1282                            if output:
1283                                key_value = output[0]
1284                                key_value = ast.literal_eval(key_value)
1285                                print "key value json  ", key_value["json"]
1286                                if "age" in key_value["json"]:
1287                                    if "body" in key_value["json"]:
1288                                        self.fail("Failed to omit empty value field")
1289                    if self.infer_types:
1290                        print_cmd = False
1291                        if self.debug_logs:
1292                            print_cmd = True
1293                        output, error = shell.execute_command(curl_cmd, debug=print_cmd)
1294                        if output:
1295                            key_value = output[0]
1296                            key_value = ast.literal_eval(key_value)
1297                            if not isinstance( key_value["json"]["age"], int):
1298                                self.fail("Failed to put inferred type into docs %s"
1299                                          % src_data[x])
1300
1301    def _verify_export_file(self, export_file_name, options):
1302        if not options["load_doc"]:
1303            if "bucket" in options and options["bucket"] == "empty":
1304                output, error = self.shell.execute_command("ls %s" % self.ex_path)
1305                if export_file_name in output[0]:
1306                    self.log.info("check if export file %s is empty"
1307                                                                % export_file_name)
1308                    output, error = self.shell.execute_command("cat %s%s"\
1309                                                 % (self.ex_path, export_file_name))
1310                    if output:
1311                        self.fail("file %s should be empty" % export_file_name)
1312                else:
1313                    self.fail("Fail to export.  File %s does not exist"
1314                                                            % export_file_name)
1315        elif options["load_doc"]:
1316            found = self.shell.file_exists(self.ex_path, export_file_name)
1317            if found:
1318                self.log.info("copy export file from remote to local")
1319                if os.path.exists("/tmp/export{0}".format(self.master.ip)):
1320                    shutil.rmtree("/tmp/export{0}".format(self.master.ip))
1321                os.makedirs("/tmp/export{0}".format(self.master.ip))
1322                self.shell.copy_file_remote_to_local(self.ex_path+export_file_name,
1323                                                    "/tmp/export{0}/".format(self.master.ip) \
1324                                                    + export_file_name)
1325                self.log.info("compare 2 json files")
1326                if self.format_type == "lines":
1327                    sample_file = open("resources/imex/json_%s_lines" % options["docs"])
1328                    samples = sample_file.read().splitlines()
1329                    samples = [x.replace(" ", "") for x in samples]
1330                    export_file = open("/tmp/export{0}/".format(self.master.ip)\
1331                                                             + export_file_name)
1332
1333                    exports = export_file.read().splitlines()
1334                    for x in range(len(exports)):
1335                        tmp = exports[x].split(",")
1336                        """ add leading zero to name value
1337                            like pymc39 to pymc039
1338                        """
1339                        tmp1 = tmp[0][13:-1].zfill(3)
1340                        tmp[0] = tmp[0][:13] + tmp1 + '"'
1341                        exports[x] = ",".join(tmp)
1342
1343                    if self.debug_logs:
1344                        s = set(exports)
1345                        not_in_exports = [x for x in samples if x not in s]
1346                        print "\n data in sample not in exports  ", not_in_exports
1347                        e = set(samples)
1348                        not_in_samples = [x for x in exports if x not in e]
1349                        print "\n data in exports not in samples  ", not_in_samples
1350                    if sorted(samples) == sorted(exports):
1351                        self.log.info("export and sample json mathch")
1352                    else:
1353                        self.fail("export and sample json does not match")
1354                    sample_file.close()
1355                    export_file.close()
1356                    self.log.info("remove file /tmp/export{0}".format(self.master.ip))
1357                    shutil.rmtree("/tmp/export{0}".format(self.master.ip))
1358                elif self.format_type == "list":
1359                    sample_file = open("resources/imex/json_list_%s_lines"\
1360                                                                 % options["docs"])
1361                    samples = sample_file.read()
1362                    samples = ast.literal_eval(samples)
1363                    samples.sort(key=lambda k: k['name'])
1364                    export_file = open("/tmp/export{0}/".format(self.master.ip)\
1365                                                             + export_file_name)
1366                    exports = export_file.read()
1367                    exports = ast.literal_eval(exports)
1368                    exports.sort(key=lambda k: k['name'])
1369
1370                    if self.debug_logs:
1371                        print "\nSample list data: %s" % samples
1372                        print "\nExport list data: %s" % exports
1373                    if samples == exports:
1374                        self.log.info("export and sample json files are matched")
1375                    else:
1376                        self.fail("export and sample json files did not match")
1377                    sample_file.close()
1378                    export_file.close()
1379                    self.log.info("remove file /tmp/export{0}".format(self.master.ip))
1380                    shutil.rmtree("/tmp/export{0}".format(self.master.ip))
1381            else:
1382                file_exist = True
1383                if self.secure_conn and self.no_cacert:
1384                    file_exist = False
1385                if file_exist:
1386                    self.fail("There is not export file '%s' in %s%s"\
1387                             % (export_file_name, self.ex_path, export_file_name))
1388
1389    def _check_output(self, word_check, output):
1390        found = False
1391        if len(output) >=1 :
1392            for x in output:
1393                if word_check.lower() in x.lower():
1394                    self.log.info("Found \"%s\" in CLI output" % word_check)
1395                    found = True
1396                    break
1397        return found
1398
1399    def _remote_copy_import_file(self, import_file):
1400        import_method = self.input.param("import_method", "file://")
1401        if "url" in import_method:
1402            import_method = ""
1403        if import_method != "":
1404            self.im_path = self.tmp_path + "import/"
1405            self.log.info("copy import file from local to remote")
1406            output, error = self.shell.execute_command("ls %s " % self.tmp_path)
1407            if self._check_output("import", output):
1408                self.log.info("remove %simport directory" % self.tmp_path)
1409                self.shell.execute_command("rm -rf  %simport " % self.tmp_path)
1410                output, error = self.shell.execute_command("ls %s " % self.tmp_path)
1411                if self._check_output("import", output):
1412                    self.fail("fail to delete import dir ")
1413            self.shell.execute_command("mkdir  %simport " % self.tmp_path)
1414            if import_file is not None:
1415                self.src_file = "resources/imex/"+ import_file
1416            else:
1417                self.fail("Need import_file param")
1418            self.des_file = self.im_path + import_file
1419            self.shell.copy_file_local_to_remote(self.src_file, self.des_file)
1420        else:
1421            self.des_file = self.import_file
1422
1423    def _check_output_option_flags(self, output, errors_path, logs_path):
1424        error_check = True
1425        if  self.input.param("threads_flag", "") == "empty":
1426            error_check = False
1427            if "Expected argument for option: -t" in output:
1428                self.log.info("%s detected empty value of threads argument"
1429                                                          % self.test_type)
1430            else:
1431                self.fail("%s could not detect empty value of argument"
1432                                                       % self.test_type)
1433        elif self.threads_flag == "notnumber":
1434            error_check = False
1435            if output and "Unable to process value for flag: -t" in output[0]:
1436                self.log.info("%s detected incorrect value of threads argument"
1437                                                                 % self.test_type)
1438            else:
1439                self.fail("%s could not detect incorrect value of argument"
1440                                                            % self.test_type)
1441        if self.errors_flag == "empty":
1442            error_check = False
1443            if "Expected argument for option: -e" in output:
1444                self.log.info("%s detected empty value of error argument"
1445                                                        % self.test_type)
1446            else:
1447                self.fail("%s could not detect empty value of argument"
1448                                                     % self.test_type)
1449        elif self.errors_flag == "relative_path":
1450            output, error = self.shell.execute_command("ls %s " % self.root_path)
1451            if self._check_output(errors_path[2:], output):
1452                error_check = False
1453                self.log.info("%s error file created" % self.test_type)
1454            else:
1455                self.fail("%s failed to create error file in log flag"
1456                                                        % self.test_type)
1457        elif self.errors_flag == "absolute_path":
1458            output, error = self.shell.execute_command("ls %s " % errors_path)
1459            if self._check_output("error", output):
1460                error_check = False
1461                self.log.info("%s error file created" % self.test_type)
1462            else:
1463                self.fail("%s failed to create error file in log flag"
1464                                                    % self.test_type)
1465        elif self.errors_flag != "":
1466            output, error = self.shell.execute_command("ls %s " % errors_path)
1467            if self._check_output(errors_path, output):
1468                error_check = False
1469                self.log.info("%s error file created" % self.test_type)
1470            else:
1471                self.fail("%s failed to create error file in error flag"
1472                                                        % self.test_type)
1473        if self.logs_flag == "empty":
1474            error_check = False
1475            if "Expected argument for option: -l" in output:
1476                self.log.info("%s detected empty value of log argument"
1477                                                        % self.test_type)
1478            else:
1479                self.fail("%s could not detect empty value of logs argument"
1480                                                            % self.test_type)
1481        elif self.logs_flag == "relative_path":
1482            output, error = self.shell.execute_command("ls %s " % self.root_path)
1483            if self._check_output(logs_path[2:], output):
1484                error_check = False
1485                self.log.info("%s log file created" % self.test_type)
1486            else:
1487                self.fail("%s failed to create log file in log flag"
1488                                                        % self.test_type)
1489        elif self.logs_flag == "absolute_path":
1490            output, error = self.shell.execute_command("ls %s " % self.cli_command_path)
1491            if self._check_output("log", output):
1492                error_check = False
1493                self.log.info("%s log file created" % self.test_type)
1494            else:
1495                self.fail("%s failed to create log file in log flag"
1496                                                        % self.test_type)
1497        return error_check
1498