1import unittest
2import logger
3import random
4import time
5import json
6import datetime
7from threading import Thread, Event
8from TestInput import TestInputSingleton
9from basetestcase import BaseTestCase
10from membase.api.rest_client import RestConnection
11from membase.helper.bucket_helper import BucketOperationHelper
12from remote.remote_util import RemoteMachineShellConnection
13from couchbase_helper.documentgenerator import BlobGenerator
14from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached
15from testconstants import MIN_COMPACTION_THRESHOLD
16from testconstants import MAX_COMPACTION_THRESHOLD
17
18
19class AutoCompactionTests(BaseTestCase):
20
21    servers = None
22    clients = None
23    log = None
24    input = None
25
26    def setUp(self):
27        super(AutoCompactionTests, self).setUp()
28        self.autocompaction_value = self.input.param("autocompaction_value", 0)
29        self.is_crashed = Event()
30        self.during_ops = self.input.param("during_ops", None)
31        self.gen_load = BlobGenerator('compact', 'compact-', self.value_size, start=0, end=self.num_items)
32        self.gen_update = BlobGenerator('compact', 'compact-', self.value_size, start=0, end=(self.num_items / 2))
33
34    @staticmethod
35    def insert_key(serverInfo, bucket_name, count, size):
36        rest = RestConnection(serverInfo)
37        smart = VBucketAwareMemcached(rest, bucket_name)
38        for i in xrange(count * 1000):
39            key = "key_" + str(i)
40            flag = random.randint(1, 999)
41            value = {"value" : MemcachedClientHelper.create_value("*", size)}
42            smart.memcached(key).set(key, 0, 0, json.dumps(value))
43
44    def load(self, server, compaction_value, bucket_name, gen):
45        monitor_fragm = self.cluster.async_monitor_db_fragmentation(server, compaction_value, bucket_name)
46        end_time = time.time() + self.wait_timeout * 50
47        # generate load until fragmentation reached
48        while monitor_fragm.state != "FINISHED":
49            if self.is_crashed.is_set():
50                self.cluster.shutdown(force=True)
51                return
52            if end_time < time.time():
53                self.fail("Fragmentation level is not reached in %s sec" % self.wait_timeout * 50)
54            # update docs to create fragmentation
55            try:
56                self._load_all_buckets(server, gen, "update", 0)
57            except Exception, ex:
58                self.is_crashed.set()
59                self.log.error("Load cannot be performed: %s" % str(ex))
60        monitor_fragm.result()
61
62    def test_database_fragmentation(self):
63        BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
64        percent_threshold = self.autocompaction_value
65        bucket_name = "default"
66        MAX_RUN = 100
67        item_size = 1024
68        update_item_size = item_size * ((float(100 - percent_threshold)) / 100)
69        serverInfo = self.servers[0]
70        self.log.info(serverInfo)
71        rest = RestConnection(serverInfo)
72        remote_client = RemoteMachineShellConnection(serverInfo)
73        output, rq_content, header = rest.set_auto_compaction("false", dbFragmentThresholdPercentage=percent_threshold, viewFragmntThresholdPercentage=None)
74        if not output and (percent_threshold <= MIN_COMPACTION_THRESHOLD or percent_threshold >= MAX_COMPACTION_THRESHOLD):
75            self.assertFalse(output, "it should be  impossible to set compaction value = {0}%".format(percent_threshold))
76            import json
77            self.assertTrue(json.loads(rq_content).has_key("errors"), "Error is not present in response")
78            self.assertTrue(str(json.loads(rq_content)["errors"]).find("Allowed range is 2 - 100") > -1, \
79                            "Error 'Allowed range is 2 - 100' expected, but was '{0}'".format(str(json.loads(rq_content)["errors"])))
80            self.log.info("Response contains error = '%(errors)s' as expected" % json.loads(rq_content))
81        elif (output and percent_threshold >= MIN_COMPACTION_THRESHOLD
82                     and percent_threshold <= MAX_RUN):
83            node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers)
84            info = rest.get_nodes_self()
85            available_ram = info.memoryQuota * (node_ram_ratio) / 2
86            items = (int(available_ram * 1000) / 2) / item_size
87            print "ITEMS =============%s" % items
88            rest.create_bucket(bucket=bucket_name, ramQuotaMB=int(available_ram), authType='sasl',
89                               saslPassword='password', replicaNumber=1, proxyPort=11211)
90            BucketOperationHelper.wait_for_memcached(serverInfo, bucket_name)
91            BucketOperationHelper.wait_for_vbuckets_ready_state(serverInfo, bucket_name)
92
93            self.log.info("start to load {0}K keys with {1} bytes/key".format(items, item_size))
94            #self.insert_key(serverInfo, bucket_name, items, item_size)
95            generator = BlobGenerator('compact', 'compact-', int(item_size), start=0, end=(items * 1000))
96            self._load_all_buckets(self.master, generator, "create", 0, 1)
97            self.log.info("sleep 10 seconds before the next run")
98            time.sleep(10)
99
100            self.log.info("start to update {0}K keys with smaller value {1} bytes/key".format(items,
101                                                                             int(update_item_size)))
102            generator_update = BlobGenerator('compact', 'compact-', int(update_item_size), start=0, end=(items * 1000))
103            if self.during_ops:
104                if self.during_ops == "change_port":
105                    self.change_port(new_port=self.input.param("new_port", "9090"))
106                    self.master.port = self.input.param("new_port", "9090")
107                elif self.during_ops == "change_password":
108                    old_pass = self.master.rest_password
109                    self.change_password(new_password=self.input.param("new_password", "new_pass"))
110                    self.master.rest_password = self.input.param("new_password", "new_pass")
111                rest = RestConnection(self.master)
112            insert_thread = Thread(target=self.load,
113                                   name="insert",
114                                   args=(self.master, self.autocompaction_value,
115                                         self.default_bucket_name, generator_update))
116            try:
117                insert_thread.start()
118                compact_run = remote_client.wait_till_compaction_end(rest, bucket_name,
119                                                                     timeout_in_seconds=(self.wait_timeout * 10))
120                if not compact_run:
121                    self.fail("auto compaction does not run")
122                elif compact_run:
123                    self.log.info("auto compaction run successfully")
124            except Exception, ex:
125                if self.during_ops:
126                     if self.during_ops == "change_password":
127                         self.change_password(new_password=old_pass)
128                     elif self.during_ops == "change_port":
129                         self.change_port(new_port='8091',
130                                          current_port=self.input.param("new_port", "9090"))
131                if str(ex).find("enospc") != -1:
132                    self.is_crashed.set()
133                    self.log.error("Disk is out of space, unable to load more data")
134                    insert_thread._Thread__stop()
135                else:
136                    insert_thread._Thread__stop()
137                    raise ex
138            else:
139                insert_thread.join()
140        else:
141            self.log.error("Unknown error")
142        if self.during_ops:
143                     if self.during_ops == "change_password":
144                         self.change_password(new_password=old_pass)
145                     elif self.during_ops == "change_port":
146                         self.change_port(new_port='8091',
147                                          current_port=self.input.param("new_port", "9090"))
148
149
150    def _viewFragmentationThreshold(self):
151        for serverInfo in self.servers:
152            self.log.info(serverInfo)
153            rest = RestConnection(serverInfo)
154            rest.set_auto_compaction(dbFragmentThresholdPercentage=80, viewFragmntThresholdPercentage=80)
155
156    def rebalance_in_with_DB_compaction(self):
157        self.disable_compaction()
158        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
159        self._montior_DB_fragmentation()
160        servs_in = self.servers[self.nodes_init:self.nodes_in + 1]
161        rebalance = self.cluster.async_rebalance([self.master], servs_in, [])
162        self.sleep(5)
163        compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
164        result = compaction_task.result(self.wait_timeout * 5)
165        self.assertTrue(result, "Compaction didn't finished correctly. Please check diags")
166        rebalance.result()
167        self.verify_cluster_stats(self.servers[:self.nodes_in + 1])
168
169    def rebalance_in_with_auto_DB_compaction(self):
170        remote_client = RemoteMachineShellConnection(self.master)
171        rest = RestConnection(self.master)
172        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value)
173        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
174        self._montior_DB_fragmentation()
175        servs_in = self.servers[1:self.nodes_in + 1]
176        rebalance = self.cluster.async_rebalance([self.master], servs_in, [])
177        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
178                                                                     timeout_in_seconds=(self.wait_timeout * 5))
179        rebalance.result()
180        monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, 0, self.default_bucket_name)
181        result = monitor_fragm.result()
182        if compact_run:
183            self.log.info("auto compaction run successfully")
184        elif result:
185            self.log.info("Compaction is already completed")
186        else:
187            self.fail("auto compaction does not run")
188        self.verify_cluster_stats(self.servers[:self.nodes_in + 1])
189        remote_client.disconnect()
190
191    def rebalance_out_with_DB_compaction(self):
192        self.log.info("create a cluster of all the available servers")
193        self.cluster.rebalance(self.servers[:self.num_servers],
194                               self.servers[1:self.num_servers], [])
195        self.disable_compaction()
196        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
197        self._montior_DB_fragmentation()
198        servs_out = [self.servers[self.num_servers - i - 1] for i in range(self.nodes_out)]
199        rebalance = self.cluster.async_rebalance([self.master], [], servs_out)
200        compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
201        result = compaction_task.result(self.wait_timeout * 5)
202        self.assertTrue(result, "Compaction didn't finished correctly. Please check diags")
203        rebalance.result()
204        self.verify_cluster_stats(self.servers[:self.num_servers - self.nodes_out])
205
206    def rebalance_out_with_auto_DB_compaction(self):
207        remote_client = RemoteMachineShellConnection(self.master)
208        rest = RestConnection(self.master)
209        self.log.info("create a cluster of all the available servers")
210        self.cluster.rebalance(self.servers[:self.num_servers],
211                               self.servers[1:self.num_servers], [])
212        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value)
213        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
214        self._montior_DB_fragmentation()
215        servs_out = [self.servers[self.num_servers - i - 1] for i in range(self.nodes_out)]
216        rebalance = self.cluster.async_rebalance([self.master], [], servs_out)
217        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
218                                                                     timeout_in_seconds=(self.wait_timeout * 5))
219        rebalance.result()
220        monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, 0, self.default_bucket_name)
221        result = monitor_fragm.result()
222        if compact_run:
223            self.log.info("auto compaction run successfully")
224        elif result:
225            self.log.info("Compaction is already completed")
226        else:
227            self.fail("auto compaction does not run")
228        self.verify_cluster_stats(self.servers[:self.num_servers - self.nodes_out])
229        remote_client.disconnect()
230
231    def rebalance_in_out_with_DB_compaction(self):
232        self.assertTrue(self.num_servers > self.nodes_in + self.nodes_out,
233                            "ERROR: Not enough nodes to do rebalance in and out")
234        servs_init = self.servers[:self.nodes_init]
235        servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
236        servs_out = [self.servers[self.nodes_init - i - 1] for i in range(self.nodes_out)]
237        result_nodes = set(servs_init + servs_in) - set(servs_out)
238        self.disable_compaction()
239        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
240        rebalance = self.cluster.async_rebalance(servs_init, servs_in, servs_out)
241        while rebalance.state != "FINISHED":
242            self._montior_DB_fragmentation()
243            compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
244            result = compaction_task.result(self.wait_timeout * 5)
245            self.assertTrue(result, "Compaction didn't finished correctly. Please check diags")
246        rebalance.result()
247        self.verify_cluster_stats(result_nodes)
248
249    def rebalance_in_out_with_auto_DB_compaction(self):
250        remote_client = RemoteMachineShellConnection(self.master)
251        rest = RestConnection(self.master)
252        self.assertTrue(self.num_servers > self.nodes_in + self.nodes_out,
253                            "ERROR: Not enough nodes to do rebalance in and out")
254        servs_init = self.servers[:self.nodes_init]
255        servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
256        servs_out = [self.servers[self.nodes_init - i - 1] for i in range(self.nodes_out)]
257        result_nodes = set(servs_init + servs_in) - set(servs_out)
258        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value)
259        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
260        rebalance = self.cluster.async_rebalance(servs_init, servs_in, servs_out)
261        while rebalance.state != "FINISHED":
262            self._montior_DB_fragmentation()
263            compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
264                                                                 timeout_in_seconds=(self.wait_timeout * 5))
265        rebalance.result()
266        monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, 0, self.default_bucket_name)
267        result = monitor_fragm.result()
268        if compact_run:
269            self.log.info("auto compaction run successfully")
270        elif result:
271            self.log.info("Compaction is already completed")
272        else:
273            self.fail("auto compaction does not run")
274        self.verify_cluster_stats(result_nodes)
275        remote_client.disconnect()
276
277    def test_database_time_compaction(self):
278        remote_client = RemoteMachineShellConnection(self.master)
279        rest = RestConnection(self.master)
280        currTime = datetime.datetime.now()
281        fromTime = currTime + datetime.timedelta(hours=1)
282        toTime = currTime + datetime.timedelta(hours=10)
283        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, allowedTimePeriodFromHour=fromTime.hour,
284                                 allowedTimePeriodFromMin=fromTime.minute, allowedTimePeriodToHour=toTime.hour, allowedTimePeriodToMin=toTime.minute,
285                                 allowedTimePeriodAbort="false")
286        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
287        self._montior_DB_fragmentation()
288        for i in xrange(10):
289            active_tasks = self.cluster.async_monitor_active_task(self.master, "bucket_compaction", "bucket", wait_task=False)
290            for active_task in active_tasks:
291                result = active_task.result()
292                self.assertTrue(result)
293                self.sleep(2)
294        currTime = datetime.datetime.now()
295        #Need to make it configurable
296        newTime = currTime + datetime.timedelta(minutes=5)
297        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, allowedTimePeriodFromHour=currTime.hour,
298                                 allowedTimePeriodFromMin=currTime.minute, allowedTimePeriodToHour=newTime.hour, allowedTimePeriodToMin=newTime.minute,
299                                 allowedTimePeriodAbort="false")
300        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
301                                                                     timeout_in_seconds=(self.wait_timeout * 5))
302        if compact_run:
303            self.log.info("auto compaction run successfully")
304        else:
305            self.fail("auto compaction does not run")
306        remote_client.disconnect()
307
308    def rebalance_in_with_DB_time_compaction(self):
309        remote_client = RemoteMachineShellConnection(self.master)
310        rest = RestConnection(self.master)
311        currTime = datetime.datetime.now()
312        fromTime = currTime + datetime.timedelta(hours=1)
313        toTime = currTime + datetime.timedelta(hours=24)
314        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, allowedTimePeriodFromHour=fromTime.hour,
315                                 allowedTimePeriodFromMin=fromTime.minute, allowedTimePeriodToHour=toTime.hour, allowedTimePeriodToMin=toTime.minute,
316                                 allowedTimePeriodAbort="false")
317        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
318        self._montior_DB_fragmentation()
319        for i in xrange(10):
320            active_tasks = self.cluster.async_monitor_active_task(self.master, "bucket_compaction", "bucket", wait_task=False)
321            for active_task in active_tasks:
322                result = active_task.result()
323                self.assertTrue(result)
324                self.sleep(2)
325        currTime = datetime.datetime.now()
326        #Need to make it configurable
327        newTime = currTime + datetime.timedelta(minutes=5)
328        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, allowedTimePeriodFromHour=currTime.hour,
329                                 allowedTimePeriodFromMin=currTime.minute, allowedTimePeriodToHour=newTime.hour, allowedTimePeriodToMin=newTime.minute,
330                                 allowedTimePeriodAbort="false")
331        servs_in = self.servers[self.nodes_init:self.nodes_in + 1]
332        rebalance = self.cluster.async_rebalance([self.master], servs_in, [])
333        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
334                                                                     timeout_in_seconds=(self.wait_timeout * 5))
335        rebalance.result()
336        if compact_run:
337            self.log.info("auto compaction run successfully")
338        else:
339            self.fail("auto compaction does not run")
340        remote_client.disconnect()
341
342    def test_database_size_compaction(self):
343        rest = RestConnection(self.master)
344        percent_threshold = self.autocompaction_value * 1048576
345        self.set_auto_compaction(rest, dbFragmentThreshold=percent_threshold)
346        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
347        end_time = time.time() + self.wait_timeout * 50
348        monitor_fragm = self.cluster.async_monitor_disk_size_fragmentation(self.master, percent_threshold, self.default_bucket_name)
349        while monitor_fragm.state != "FINISHED":
350            if end_time < time.time():
351                self.fail("Fragmentation level is not reached in %s sec" % self.wait_timeout * 50)
352            try:
353                monitor_fragm = self.cluster.async_monitor_disk_size_fragmentation(self.master, percent_threshold, self.default_bucket_name)
354                self._load_all_buckets(self.master, self.gen_update, "update", 0)
355                active_tasks = self.cluster.async_monitor_active_task(self.master, "bucket_compaction", "bucket", wait_task=False)
356                for active_task in active_tasks:
357                    result = active_task.result()
358                    self.assertTrue(result)
359                    self.sleep(2)
360            except Exception, ex:
361                self.log.error("Load cannot be performed: %s" % str(ex))
362                self.fail(ex)
363        monitor_fragm.result()
364
365    def test_start_stop_DB_compaction(self):
366        rest = RestConnection(self.master)
367        remote_client = RemoteMachineShellConnection(self.master)
368        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
369        self.disable_compaction()
370        self._montior_DB_fragmentation()
371        compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
372        self._cancel_bucket_compaction(rest, self.default_bucket_name)
373        compaction_task.result(self.wait_timeout)
374        self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
375        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name, timeout_in_seconds=self.wait_timeout)
376        compaction_task.result(self.wait_timeout)
377        if compact_run:
378            self.log.info("auto compaction run successfully")
379        else:
380            self.fail("auto compaction does not run")
381        remote_client.disconnect()
382
383    def test_start_stop_auto_DB_compaction(self):
384        threads = []
385        rest = RestConnection(self.master)
386        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value)
387        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
388        threads.append(Thread(target=self._montior_DB_fragmentation, name="DB_Thread", args=()))
389        threads.append(Thread(target=self._cancel_bucket_compaction, name="cancel_Thread", args=(rest, self.default_bucket_name,)))
390        for thread in threads:
391            thread.start()
392            self.sleep(2)
393        for thread in threads:
394            thread.join()
395        if self.is_crashed.is_set():
396            self.fail("Error occurred during test run")
397
398    def _cancel_bucket_compaction(self, rest, bucket):
399        remote_client = RemoteMachineShellConnection(self.master)
400        compaction_running = True
401        end_time = time.time() + self.wait_timeout * 5
402        while compaction_running:
403            if end_time < time.time():
404                self.is_crashed.set()
405                self.fail("Compaction is not started in %s sec" % end_time)
406            tasks = rest.active_tasks()
407            for task in tasks:
408                if task["type"] == "bucket_compaction":
409                    try:
410                        result = self.cluster.cancel_bucket_compaction(self.master, bucket)
411                        self.assertTrue(result)
412                        remote_client.wait_till_compaction_end(rest, self.default_bucket_name, self.wait_timeout)
413                        compaction_running = False
414                    except Exception, ex:
415                        self.is_crashed.set()
416                        self.log.error("Compaction cannot be cancelled: %s" % str(ex))
417            remote_client.disconnect()
418
419    def test_auto_compaction_with_multiple_buckets(self):
420        remote_client = RemoteMachineShellConnection(self.master)
421        rest = RestConnection(self.master)
422        for bucket in self.buckets:
423            if bucket.name == "default":
424                self.disable_compaction()
425            else:
426                self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, bucket=bucket.name)
427        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
428        end_time = time.time() + self.wait_timeout * 30
429        for bucket in self.buckets:
430            monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, self.autocompaction_value, bucket.name)
431            while monitor_fragm.state != "FINISHED":
432                if end_time < time.time():
433                    self.fail("Fragmentation level is not reached in %s sec" % self.wait_timeout * 30)
434                try:
435                    self._load_all_buckets(self.servers[0], self.gen_update, "update", 0)
436                except Exception, ex:
437                    self.log.error("Load cannot be performed: %s" % str(ex))
438                    self.fail(ex)
439            monitor_fragm.result()
440            compact_run = remote_client.wait_till_compaction_end(rest, bucket.name,
441                                                                     timeout_in_seconds=(self.wait_timeout * 5))
442            if compact_run:
443                self.log.info("auto compaction run successfully")
444        remote_client.disconnect()
445
446    def _montior_DB_fragmentation(self):
447        monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, self.autocompaction_value, self.default_bucket_name)
448        end_time = time.time() + self.wait_timeout * 30
449        while monitor_fragm.state != "FINISHED":
450            if end_time < time.time():
451                self.fail("Fragmentation level is not reached in %s sec" % self.wait_timeout * 30)
452            try:
453                self._load_all_buckets(self.master, self.gen_update, "update", 0)
454            except Exception, ex:
455                self.is_crashed.set()
456                self.log.error("Load cannot be performed: %s" % str(ex))
457                self.fail(ex)
458        result = monitor_fragm.result()
459        if not result:
460            self.is_crashed.set()
461        self.assertTrue(result, "Fragmentation level is not reached")
462