1import json
2
3from lib.couchbase_helper.documentgenerator import BlobGenerator, JsonDocGenerator, JSONNonDocGenerator
4from lib.membase.api.rest_client import RestConnection
5from lib.testconstants import STANDARD_BUCKET_PORT
6from couchbase.bucket import Bucket
7from pytests.eventing.eventing_constants import HANDLER_CODE, HANDLER_CODE_ERROR
8from pytests.eventing.eventing_base import EventingBaseTest
9import logging
10
11log = logging.getLogger()
12
13
14class EventingNegative(EventingBaseTest):
15    def setUp(self):
16        super(EventingNegative, self).setUp()
17        if self.create_functions_buckets:
18            self.bucket_size = 100
19            log.info(self.bucket_size)
20            bucket_params = self._create_bucket_params(server=self.server, size=self.bucket_size,
21                                                       replicas=self.num_replicas)
22            self.cluster.create_standard_bucket(name=self.src_bucket_name, port=STANDARD_BUCKET_PORT + 1,
23                                                bucket_params=bucket_params)
24            self.src_bucket = RestConnection(self.master).get_buckets()
25            self.cluster.create_standard_bucket(name=self.dst_bucket_name, port=STANDARD_BUCKET_PORT + 1,
26                                                bucket_params=bucket_params)
27            self.cluster.create_standard_bucket(name=self.metadata_bucket_name, port=STANDARD_BUCKET_PORT + 1,
28                                                bucket_params=bucket_params)
29            self.buckets = RestConnection(self.master).get_buckets()
30        self.gens_load = self.generate_docs(self.docs_per_day)
31        self.expiry = 3
32
33    def tearDown(self):
34        super(EventingNegative, self).tearDown()
35
36    def test_delete_function_when_function_is_in_deployed_state_and_which_is_already_deleted(self):
37        self.load(self.gens_load, buckets=self.src_bucket, flag=self.item_flag, verify_data=False,
38                  batch_size=self.batch_size)
39        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_ON_UPDATE, worker_count=3)
40        self.deploy_function(body)
41        # Wait for eventing to catch up with all the create mutations and verify results
42        self.verify_eventing_results(self.function_name, self.docs_per_day * 2016)
43        # Try deleting a function which is still in deployed state
44        try:
45            self.delete_function(body)
46        except Exception as ex:
47            log.info("output from delete API before undeploying function: {0}".format(str(ex)))
48            message = "Skipping delete request from primary store for app: {0} as it hasn't been undeployed".format(
49                self.function_name)
50            if message not in str(ex):
51                self.fail("Function delete succeeded even when function was in deployed state")
52        self.undeploy_and_delete_function(body)
53        try:
54            # Try deleting a function which is already deleted
55            self.delete_function(body)
56        except Exception as ex:
57            message = "App: {0} not deployed".format(self.function_name)
58            if message not in str(ex):
59                self.fail("Function delete succeeded even when function was in deployed state")
60
61    def test_deploy_function_where_source_metadata_and_destination_buckets_dont_exist(self):
62        # delete source, metadata and destination buckets
63        for bucket in self.buckets:
64            self.rest.delete_bucket(bucket.name)
65        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_ON_UPDATE, worker_count=3)
66        try:
67            self.rest.save_function(body['appname'], body)
68            self.rest.deploy_function(body['appname'], body)
69        except Exception as ex:
70            if "ERR_BUCKET_MISSING" not in str(ex):
71                self.fail("Function save/deploy succeeded even when src/dst/metadata buckets doesn't exist")
72
73    def test_deploy_function_where_source_and_metadata_buckets_are_same(self):
74        self.load(self.gens_load, buckets=self.src_bucket, flag=self.item_flag, verify_data=False,
75                  batch_size=self.batch_size)
76        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_ON_UPDATE, worker_count=3)
77        # set both src and metadata bucket as same
78        body['depcfg']['metadata_bucket'] = self.src_bucket_name
79        try:
80            self.rest.save_function(body['appname'], body)
81            # Try to deploy the function
82            self.rest.deploy_function(body['appname'], body)
83        except Exception as ex:
84            if "Source bucket same as metadata bucket" not in str(ex):
85                self.fail("Eventing function allowed both source and metadata bucket to be same")
86
87    def test_eventing_with_memcached_buckets(self):
88        # delete existing couchbase buckets which will be created as part of setup
89        for bucket in self.buckets:
90            self.rest.delete_bucket(bucket.name)
91        # create memcached bucket with the same name
92        bucket_params = self._create_bucket_params(server=self.server, size=self.bucket_size,
93                                                   replicas=self.num_replicas)
94        tasks = []
95        for bucket in self.buckets:
96            tasks.append(self.cluster.async_create_memcached_bucket(name=bucket.name,
97                                                                    port=STANDARD_BUCKET_PORT + 1,
98                                                                    bucket_params=bucket_params))
99        for task in tasks:
100            task.result()
101        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_ON_UPDATE, worker_count=3)
102        try:
103            self.rest.save_function(body['appname'], body)
104            self.rest.deploy_function(body['appname'], body)
105        except Exception as ex:
106            if "ERR_SOURCE_BUCKET_MEMCACHED" not in str(ex):
107                self.fail("Eventing function allowed both source and metadata bucket to be memcached buckets")
108
109    def test_src_metadata_and_dst_bucket_flush_when_eventing_is_processing_mutations(self):
110        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_WITH_DOC_TIMER)
111        self.deploy_function(body)
112        self.load(self.gens_load, buckets=self.src_bucket, flag=self.item_flag, verify_data=False,
113                  batch_size=self.batch_size)
114        # flush source, metadata and destination buckets when eventing is processing_mutations
115        for bucket in self.buckets:
116            self.rest.flush_bucket(bucket.name)
117        # Undeploy and delete the function. In case of flush functions are not undeployed automatically
118        self.undeploy_and_delete_function(body)
119        # check if all the eventing-consumers are cleaned up
120        # Validation of any issues like panic will be taken care by teardown method
121        self.assertTrue(self.check_if_eventing_consumers_are_cleaned_up(),
122                        msg="eventing-consumer processes are not cleaned up even after undeploying the function")
123
124    # See MB-30377
125    def test_src_metadata_and_dst_bucket_delete_when_eventing_is_processing_mutations(self):
126        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_WITH_DOC_TIMER)
127        self.deploy_function(body)
128        self.load(self.gens_load, buckets=self.src_bucket, flag=self.item_flag, verify_data=False,
129                  batch_size=self.batch_size)
130        # delete source, metadata and destination buckets when eventing is processing_mutations
131        for bucket in self.buckets:
132                self.log.info("deleting bucket: %s",bucket.name)
133                self.rest.delete_bucket(bucket.name)
134        # Wait for function to get undeployed automatically
135        self.wait_for_undeployment(body['appname'])
136        # Delete the function
137        self.delete_function(body)
138        self.sleep(60)
139        # check if all the eventing-consumers are cleaned up
140        # Validation of any issues like panic will be taken care by teardown method
141        self.assertTrue(self.check_if_eventing_consumers_are_cleaned_up(),
142                        msg="eventing-consumer processes are not cleaned up even after undeploying the function")
143
144    # MB-30377
145    def test_src_bucket_delete_when_eventing_is_processing_mutations(self):
146        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_WITH_DOC_TIMER)
147        self.deploy_function(body)
148        self.load(self.gens_load, buckets=self.src_bucket, flag=self.item_flag, verify_data=False,
149                  batch_size=self.batch_size)
150        # delete source, metadata and destination buckets when eventing is processing_mutations
151        for bucket in self.buckets:
152            if bucket.name == "src_bucket":
153                self.log.info("deleting bucket: %s",bucket.name)
154                self.rest.delete_bucket(bucket.name)
155        # Wait for function to get undeployed automatically
156        self.wait_for_undeployment(body['appname'])
157        # Delete the function
158        self.delete_function(body)
159        self.sleep(60)
160        # check if all the eventing-consumers are cleaned up
161        # Validation of any issues like panic will be taken care by teardown method
162        self.assertTrue(self.check_if_eventing_consumers_are_cleaned_up(),
163                        msg="eventing-consumer processes are not cleaned up even after undeploying the function")
164
165    # MB-29533 and MB-31545
166    def test_metadata_bucket_delete_when_eventing_is_processing_mutations(self):
167        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_WITH_DOC_TIMER)
168        self.deploy_function(body)
169        self.load(self.gens_load, buckets=self.src_bucket, flag=self.item_flag, verify_data=False,
170                  batch_size=self.batch_size)
171        # delete source, metadata and destination buckets when eventing is processing_mutations
172        for bucket in self.buckets:
173            if bucket.name == "metadata":
174                self.log.info("deleting bucket: %s",bucket.name)
175                self.rest.delete_bucket(bucket.name)
176        # Wait for function to get undeployed automatically
177        self.wait_for_undeployment(body['appname'])
178        # Delete the function
179        self.delete_function(body)
180        self.sleep(60)
181        # check if all the eventing-consumers are cleaned up
182        # Validation of any issues like panic will be taken care by teardown method
183        self.assertTrue(self.check_if_eventing_consumers_are_cleaned_up(),
184                        msg="eventing-consumer processes are not cleaned up even after undeploying the function")
185
186    def test_undeploy_when_function_is_still_in_bootstrap_state(self):
187        self.load(self.gens_load, buckets=self.src_bucket, flag=self.item_flag, verify_data=False,
188                  batch_size=self.batch_size)
189        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_ON_UPDATE, worker_count=3)
190        self.deploy_function(body, wait_for_bootstrap=False)
191        body1 = {"count": 1}
192        # Set retry to 1
193        self.rest.set_eventing_retry(body['appname'], body1)
194        try:
195            # Try undeploying the function when it is still bootstrapping
196            self.undeploy_function(body)
197        except Exception as ex:
198            if "not bootstrapped. Operation not permitted. Edit function instead" not in str(ex):
199                self.fail("Function undeploy succeeded even when function was in bootstrapping state")
200        self.undeploy_and_delete_function(body)
201
202    def test_function_where_handler_code_takes_more_time_to_execute_than_execution_timeout(self):
203        # Note to Self : Never use SDK's unless you really have to. It is difficult to upgrade or maintain correct
204        # sdk versions on the slaves. Scripts will be notoriously unreliable when you run on jenkins slaves.
205        num_docs = 10
206        values = ['1', '10']
207        # create 10 non json docs on source bucket
208        gen_load_non_json = JSONNonDocGenerator('non_json_docs', values, start=0, end=num_docs)
209        self.cluster.load_gen_docs(self.master, self.src_bucket_name, gen_load_non_json, self.buckets[0].kvs[1],
210                                   'create', compression=self.sdk_compression)
211        # create a function which sleeps for 5 secs and set execution_timeout to 1s
212        body = self.create_save_function_body(self.function_name, HANDLER_CODE_ERROR.EXECUTION_TIME_MORE_THAN_TIMEOUT,
213                                              execution_timeout=30)
214        # deploy the function
215        self.deploy_function(body)
216        # This is intentionally added so that we wait for some mutations to process and we decide none are processed
217        self.sleep(60)
218        # No docs should be present in dst_bucket as the all the function executions should have timed out
219        self.verify_eventing_results(self.function_name, 0, skip_stats_validation=True)
220        eventing_nodes = self.get_nodes_from_services_map(service_type="eventing", get_all_nodes=True)
221        exec_timeout_count = 0
222        for eventing_node in eventing_nodes:
223            rest_conn = RestConnection(eventing_node)
224            out = rest_conn.get_all_eventing_stats()
225            # get sum of all timeout_count
226            exec_timeout_count += out[0]["failure_stats"]["timeout_count"]
227        # check whether all the function executions timed out and is equal to number of docs created
228        if exec_timeout_count != num_docs:
229            self.fail("Not all event executions timed out : Expected : {0} Actual : {1}".format(num_docs,
230                                                                                                exec_timeout_count))
231        self.undeploy_and_delete_function(body)
232
233    def test_syntax_error(self):
234        self.load(self.gens_load, buckets=self.src_bucket, flag=self.item_flag, verify_data=False,
235                  batch_size=self.batch_size)
236        body = self.create_save_function_body(self.function_name, HANDLER_CODE.SYNTAX_ERROR)
237        try:
238            self.deploy_function(body, deployment_fail=True)
239        except Exception as e:
240            if "Unexpected end of input" not in str(e):
241                self.fail("Deployment is expected to be failed but no message of failure")
242
243    def test_read_binary_data_from_the_function(self):
244        gen_load_binary = BlobGenerator('binary1000000', 'binary', self.value_size, start=1,
245                                        end=2016 * self.docs_per_day + 1)
246        gen_load_json = JsonDocGenerator('binary', op_type="create", end=2016 * self.docs_per_day)
247        # load binary data on dst bucket and non json on src bucket with identical keys so that we can read them
248        self.cluster.load_gen_docs(self.master, self.src_bucket_name, gen_load_json, self.buckets[0].kvs[1], "create",
249                                   exp=0, flag=0, batch_size=1000, compression=self.sdk_compression)
250        self.cluster.load_gen_docs(self.master, self.dst_bucket_name, gen_load_binary, self.buckets[0].kvs[1], "create",
251                                   exp=0, flag=0, batch_size=1000, compression=self.sdk_compression)
252        body = self.create_save_function_body(self.function_name, HANDLER_CODE.READ_BUCKET_OP_ON_DST)
253        self.deploy_function(body)
254        # wait for some time so that exception_count increases
255        # This is because we can't read binary data from handler code
256        self.sleep(60)
257        stats = self.rest.get_all_eventing_stats()
258        bucket_op_exception_count = stats[0]["failure_stats"]["bucket_op_exception_count"]
259        self.undeploy_and_delete_function(body)
260        log.info("stats : {0}".format(json.dumps(stats, sort_keys=True, indent=4)))
261        if bucket_op_exception_count == 0:
262            self.fail("Reading binary data succeeded from handler code")
263
264    def test_deploy_function_name_with_more_than_100_chars(self):
265        # create a string of more than 100 chars
266        function_name = "a" * 101
267        body = self.create_save_function_body(function_name, HANDLER_CODE.BUCKET_OPS_WITH_DOC_TIMER)
268        try:
269            self.deploy_function(body, deployment_fail=True)
270        except Exception as e:
271            if "Function name length must be less than 100" not in str(e):
272                self.fail("Deployment is expected to be failed but succeeded with function name more than 100 chars")
273
274    def test_deploy_function_name_with_special_chars(self):
275        # create a string with space and other special chars
276        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_WITH_DOC_TIMER)
277        body['appname'] = "a b c @ # $ % ^ & * ( ) + ="
278        try:
279            content = self.rest.create_function("abc", body)
280        except Exception as e:
281            if "Function name can only contain characters in range A-Z, a-z, 0-9 and underscore, hyphen" not in str(e):
282                self.fail("Deployment is expected to be failed when space is present in function name")
283
284    def test_deploy_function_invalid_alias_name(self):
285        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_WITH_DOC_TIMER)
286        # Use an invalid alias
287        body['depcfg']['buckets'].append({"alias": "908!@#$%%^&&**", "bucket_name": self.dst_bucket_name})
288        try:
289            self.deploy_function(body, deployment_fail=True)
290        except Exception as e:
291            if "ERR_INVALID_CONFIG" not in str(e):
292                log.info(str(e))
293                self.fail("Deployment is expected to be failed but succeeded with function name more than 100 chars")
294
295    def test_deploy_function_with_prefix_length_greater_than_16_chars(self):
296        body = self.create_save_function_body(self.function_name, HANDLER_CODE.BUCKET_OPS_WITH_DOC_TIMER)
297        # Use an user_prefix greater than 16 chars
298        body['settings']['user_prefix'] = "eventingeventingeventingeventingeventingeventingeventingeventingeventing"
299        try:
300            self.deploy_function(body, deployment_fail=True)
301        except Exception as e:
302            if "ERR_INVALID_CONFIG" not in str(e):
303                log.info(str(e))
304                self.fail("Deployment is expected to be failed but succeeded with user_prefix greater than 16 chars")