1import unittest
2import logger
3import random
4import time
5import json
6import datetime
7from threading import Thread, Event
8from TestInput import TestInputSingleton
9from basetestcase import BaseTestCase
10from membase.api.rest_client import RestConnection
11from membase.helper.bucket_helper import BucketOperationHelper
12from remote.remote_util import RemoteMachineShellConnection
13from couchbase_helper.documentgenerator import BlobGenerator
14from memcached.helper.data_helper import MemcachedClientHelper, VBucketAwareMemcached
15from testconstants import MIN_COMPACTION_THRESHOLD
16from testconstants import MAX_COMPACTION_THRESHOLD
17
18
19class AutoCompactionTests(BaseTestCase):
20
21    servers = None
22    clients = None
23    log = None
24    input = None
25
26    def setUp(self):
27        super(AutoCompactionTests, self).setUp()
28        self.autocompaction_value = self.input.param("autocompaction_value", 0)
29        self.is_crashed = Event()
30        self.during_ops = self.input.param("during_ops", None)
31        self.gen_load = BlobGenerator('compact', 'compact-', self.value_size, start=0, end=self.num_items)
32        self.gen_update = BlobGenerator('compact', 'compact-', self.value_size, start=0, end=(self.num_items / 2))
33
34    @staticmethod
35    def insert_key(serverInfo, bucket_name, count, size):
36        rest = RestConnection(serverInfo)
37        smart = VBucketAwareMemcached(rest, bucket_name)
38        for i in xrange(count * 1000):
39            key = "key_" + str(i)
40            flag = random.randint(1, 999)
41            value = {"value" : MemcachedClientHelper.create_value("*", size)}
42            smart.memcached(key).set(key, 0, 0, json.dumps(value))
43
44    def load(self, server, compaction_value, bucket_name, gen):
45        self.log.info('in the load, wait time is {0}'.format(self.wait_timeout) )
46        monitor_fragm = self.cluster.async_monitor_db_fragmentation(server, compaction_value, bucket_name)
47        end_time = time.time() + self.wait_timeout * 5
48        # generate load until fragmentation reached
49        while monitor_fragm.state != "FINISHED":
50            if self.is_crashed.is_set():
51                self.cluster.shutdown(force=True)
52                return
53
54            if end_time < time.time():
55                self.err = "Fragmentation level is not reached in %s sec" % self.wait_timeout * 5
56                return
57            # update docs to create fragmentation
58            try:
59                self._load_all_buckets(server, gen, "update", 0)
60            except Exception, ex:
61                self.is_crashed.set()
62                self.log.error("Load cannot be performed: %s" % str(ex))
63        monitor_fragm.result()
64
65    def test_database_fragmentation(self):
66
67
68        self.log.info('start test_database_fragmentation')
69
70        self.err = None
71        BucketOperationHelper.delete_all_buckets_or_assert(self.servers, self)
72        percent_threshold = self.autocompaction_value
73        bucket_name = "default"
74        MAX_RUN = 100
75        item_size = 1024
76        update_item_size = item_size * ((float(100 - percent_threshold)) / 100)
77        serverInfo = self.servers[0]
78        self.log.info(serverInfo)
79
80        rest = RestConnection(serverInfo)
81        remote_client = RemoteMachineShellConnection(serverInfo)
82        output, rq_content, header = rest.set_auto_compaction("false", dbFragmentThresholdPercentage=percent_threshold, viewFragmntThresholdPercentage=None)
83
84        if not output and (percent_threshold <= MIN_COMPACTION_THRESHOLD or percent_threshold >= MAX_COMPACTION_THRESHOLD):
85            self.assertFalse(output, "it should be  impossible to set compaction value = {0}%".format(percent_threshold))
86            import json
87            self.assertTrue(json.loads(rq_content).has_key("errors"), "Error is not present in response")
88            self.assertTrue(str(json.loads(rq_content)["errors"]).find("Allowed range is 2 - 100") > -1, \
89                            "Error 'Allowed range is 2 - 100' expected, but was '{0}'".format(str(json.loads(rq_content)["errors"])))
90            self.log.info("Response contains error = '%(errors)s' as expected" % json.loads(rq_content))
91
92        elif (output and percent_threshold >= MIN_COMPACTION_THRESHOLD
93                     and percent_threshold <= MAX_RUN):
94            node_ram_ratio = BucketOperationHelper.base_bucket_ratio(TestInputSingleton.input.servers)
95            info = rest.get_nodes_self()
96            available_ram = info.memoryQuota * (node_ram_ratio) / 2
97            items = (int(available_ram * 1000) / 2) / item_size
98            print "ITEMS =============%s" % items
99
100            rest.create_bucket(bucket=bucket_name, ramQuotaMB=int(available_ram), authType='sasl',
101                               saslPassword='password', replicaNumber=1, proxyPort=11211)
102            BucketOperationHelper.wait_for_memcached(serverInfo, bucket_name)
103            BucketOperationHelper.wait_for_vbuckets_ready_state(serverInfo, bucket_name)
104
105            self.log.info("******start to load {0}K keys with {1} bytes/key".format(items, item_size))
106            #self.insert_key(serverInfo, bucket_name, items, item_size)
107            generator = BlobGenerator('compact', 'compact-', int(item_size), start=0, end=(items * 1000))
108            self._load_all_buckets(self.master, generator, "create", 0, 1, batch_size=1000)
109            self.log.info("sleep 10 seconds before the next run")
110            time.sleep(10)
111
112            self.log.info("********start to update {0}K keys with smaller value {1} bytes/key".format(items,
113                                                                             int(update_item_size)))
114            generator_update = BlobGenerator('compact', 'compact-', int(update_item_size), start=0, end=(items * 1000))
115            if self.during_ops:
116                if self.during_ops == "change_port":
117                    self.change_port(new_port=self.input.param("new_port", "9090"))
118                    self.master.port = self.input.param("new_port", "9090")
119                elif self.during_ops == "change_password":
120                    old_pass = self.master.rest_password
121                    self.change_password(new_password=self.input.param("new_password", "new_pass"))
122                    self.master.rest_password = self.input.param("new_password", "new_pass")
123                rest = RestConnection(self.master)
124            insert_thread = Thread(target=self.load,
125                                   name="insert",
126                                   args=(self.master, self.autocompaction_value,
127                                         self.default_bucket_name, generator_update))
128            try:
129                self.log.info('starting the load thread')
130                insert_thread.start()
131
132                compact_run = remote_client.wait_till_compaction_end(rest, bucket_name,
133                                                                     timeout_in_seconds=(self.wait_timeout * 10))
134
135                if not compact_run:
136                    self.fail("auto compaction does not run")
137                elif compact_run:
138                    self.log.info("auto compaction run successfully")
139            except Exception, ex:
140                self.log.info("exception in auto compaction")
141                if self.during_ops:
142                     if self.during_ops == "change_password":
143                         self.change_password(new_password=old_pass)
144                     elif self.during_ops == "change_port":
145                         self.change_port(new_port='8091',
146                                          current_port=self.input.param("new_port", "9090"))
147                if str(ex).find("enospc") != -1:
148                    self.is_crashed.set()
149                    self.log.error("Disk is out of space, unable to load more data")
150                    insert_thread._Thread__stop()
151                else:
152                    insert_thread._Thread__stop()
153                    raise ex
154            else:
155                insert_thread.join()
156                if self.err is not None:
157                    self.fail(self.err)
158        else:
159            self.log.error("Unknown error")
160        if self.during_ops:
161                     if self.during_ops == "change_password":
162                         self.change_password(new_password=old_pass)
163                     elif self.during_ops == "change_port":
164                         self.change_port(new_port='8091',
165                                          current_port=self.input.param("new_port", "9090"))
166
167
168    def _viewFragmentationThreshold(self):
169        for serverInfo in self.servers:
170            self.log.info(serverInfo)
171            rest = RestConnection(serverInfo)
172            rest.set_auto_compaction(dbFragmentThresholdPercentage=80, viewFragmntThresholdPercentage=80)
173
174    def rebalance_in_with_DB_compaction(self):
175        self.disable_compaction()
176        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
177        self._monitor_DB_fragmentation()
178        servs_in = self.servers[self.nodes_init:self.nodes_in + 1]
179        rebalance = self.cluster.async_rebalance([self.master], servs_in, [])
180        self.sleep(5)
181        compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
182        result = compaction_task.result(self.wait_timeout * 5)
183        self.assertTrue(result, "Compaction didn't finished correctly. Please check diags")
184        rebalance.result()
185        self.sleep(30)
186        self.verify_cluster_stats(self.servers[:self.nodes_in + 1])
187
188    def rebalance_in_with_auto_DB_compaction(self):
189        remote_client = RemoteMachineShellConnection(self.master)
190        rest = RestConnection(self.master)
191        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value)
192        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
193        self._monitor_DB_fragmentation()
194        servs_in = self.servers[1:self.nodes_in + 1]
195        rebalance = self.cluster.async_rebalance([self.master], servs_in, [])
196        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
197                                                                     timeout_in_seconds=(self.wait_timeout * 5))
198        rebalance.result()
199        self.sleep(30)
200        monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, 0, self.default_bucket_name)
201        result = monitor_fragm.result()
202        if compact_run:
203            self.log.info("auto compaction run successfully")
204        elif result:
205            self.log.info("Compaction is already completed")
206        else:
207            self.fail("auto compaction does not run")
208        self.verify_cluster_stats(self.servers[:self.nodes_in + 1])
209        remote_client.disconnect()
210
211    def rebalance_out_with_DB_compaction(self):
212        self.log.info("create a cluster of all the available servers")
213        self.cluster.rebalance(self.servers[:self.num_servers],
214                               self.servers[1:self.num_servers], [])
215        self.disable_compaction()
216        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
217        self._monitor_DB_fragmentation()
218        servs_out = [self.servers[self.num_servers - i - 1] for i in range(self.nodes_out)]
219        rebalance = self.cluster.async_rebalance([self.master], [], servs_out)
220        compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
221        result = compaction_task.result(self.wait_timeout * 5)
222        self.assertTrue(result, "Compaction didn't finished correctly. Please check diags")
223        rebalance.result()
224        self.sleep(30)
225        self.verify_cluster_stats(self.servers[:self.num_servers - self.nodes_out])
226
227    def rebalance_out_with_auto_DB_compaction(self):
228        remote_client = RemoteMachineShellConnection(self.master)
229        rest = RestConnection(self.master)
230        self.log.info("create a cluster of all the available servers")
231        self.cluster.rebalance(self.servers[:self.num_servers],
232                               self.servers[1:self.num_servers], [])
233        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value)
234        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
235        self._monitor_DB_fragmentation()
236        servs_out = [self.servers[self.num_servers - i - 1] for i in range(self.nodes_out)]
237        rebalance = self.cluster.async_rebalance([self.master], [], servs_out)
238        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
239                                                                     timeout_in_seconds=(self.wait_timeout * 5))
240        rebalance.result()
241        self.sleep(30)
242        monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, 0, self.default_bucket_name)
243        result = monitor_fragm.result()
244        if compact_run:
245            self.log.info("auto compaction run successfully")
246        elif result:
247            self.log.info("Compaction is already completed")
248        else:
249            self.fail("auto compaction does not run")
250        self.verify_cluster_stats(self.servers[:self.num_servers - self.nodes_out])
251        remote_client.disconnect()
252
253    def rebalance_in_out_with_DB_compaction(self):
254        self.assertTrue(self.num_servers > self.nodes_in + self.nodes_out,
255                            "ERROR: Not enough nodes to do rebalance in and out")
256        servs_init = self.servers[:self.nodes_init]
257        servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
258        servs_out = [self.servers[self.nodes_init - i - 1] for i in range(self.nodes_out)]
259        result_nodes = set(servs_init + servs_in) - set(servs_out)
260        self.disable_compaction()
261        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
262        rebalance = self.cluster.async_rebalance(servs_init, servs_in, servs_out)
263        while rebalance.state != "FINISHED":
264            self._monitor_DB_fragmentation()
265            compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
266            result = compaction_task.result(self.wait_timeout * 5)
267            self.assertTrue(result, "Compaction didn't finished correctly. Please check diags")
268        rebalance.result()
269        self.sleep(30)
270        self.verify_cluster_stats(result_nodes)
271
272    def rebalance_in_out_with_auto_DB_compaction(self):
273        remote_client = RemoteMachineShellConnection(self.master)
274        rest = RestConnection(self.master)
275        self.assertTrue(self.num_servers > self.nodes_in + self.nodes_out,
276                            "ERROR: Not enough nodes to do rebalance in and out")
277        servs_init = self.servers[:self.nodes_init]
278        servs_in = [self.servers[i + self.nodes_init] for i in range(self.nodes_in)]
279        servs_out = [self.servers[self.nodes_init - i - 1] for i in range(self.nodes_out)]
280        result_nodes = set(servs_init + servs_in) - set(servs_out)
281        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value)
282        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
283        rebalance = self.cluster.async_rebalance(servs_init, servs_in, servs_out)
284        while rebalance.state != "FINISHED":
285            self._monitor_DB_fragmentation()
286            compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
287                                                                 timeout_in_seconds=(self.wait_timeout * 5))
288        rebalance.result()
289        self.sleep(30)
290        monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, 0, self.default_bucket_name)
291        result = monitor_fragm.result()
292        if compact_run:
293            self.log.info("auto compaction run successfully")
294        elif result:
295            self.log.info("Compaction is already completed")
296        else:
297            self.fail("auto compaction does not run")
298        self.verify_cluster_stats(result_nodes)
299        remote_client.disconnect()
300
301    def test_database_time_compaction(self):
302        remote_client = RemoteMachineShellConnection(self.master)
303        rest = RestConnection(self.master)
304        currTime = datetime.datetime.now()
305        fromTime = currTime + datetime.timedelta(hours=1)
306        toTime = currTime + datetime.timedelta(hours=10)
307        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, allowedTimePeriodFromHour=fromTime.hour,
308                                 allowedTimePeriodFromMin=fromTime.minute, allowedTimePeriodToHour=toTime.hour, allowedTimePeriodToMin=toTime.minute,
309                                 allowedTimePeriodAbort="false")
310        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
311        self._monitor_DB_fragmentation()
312        for i in xrange(10):
313            active_tasks = self.cluster.async_monitor_active_task(self.master, "bucket_compaction", "bucket", wait_task=False)
314            for active_task in active_tasks:
315                result = active_task.result()
316                self.assertTrue(result)
317                self.sleep(2)
318        currTime = datetime.datetime.now()
319        #Need to make it configurable
320        newTime = currTime + datetime.timedelta(minutes=5)
321        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, allowedTimePeriodFromHour=currTime.hour,
322                                 allowedTimePeriodFromMin=currTime.minute, allowedTimePeriodToHour=newTime.hour, allowedTimePeriodToMin=newTime.minute,
323                                 allowedTimePeriodAbort="false")
324        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
325                                                                     timeout_in_seconds=(self.wait_timeout * 5))
326        if compact_run:
327            self.log.info("auto compaction run successfully")
328        else:
329            self.fail("auto compaction does not run")
330        remote_client.disconnect()
331
332    def rebalance_in_with_DB_time_compaction(self):
333        remote_client = RemoteMachineShellConnection(self.master)
334        rest = RestConnection(self.master)
335        currTime = datetime.datetime.now()
336        fromTime = currTime + datetime.timedelta(hours=1)
337        toTime = currTime + datetime.timedelta(hours=24)
338        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, allowedTimePeriodFromHour=fromTime.hour,
339                                 allowedTimePeriodFromMin=fromTime.minute, allowedTimePeriodToHour=toTime.hour, allowedTimePeriodToMin=toTime.minute,
340                                 allowedTimePeriodAbort="false")
341        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
342        self._monitor_DB_fragmentation()
343        for i in xrange(10):
344            active_tasks = self.cluster.async_monitor_active_task(self.master, "bucket_compaction", "bucket", wait_task=False)
345            for active_task in active_tasks:
346                result = active_task.result()
347                self.assertTrue(result)
348                self.sleep(2)
349        currTime = datetime.datetime.now()
350        #Need to make it configurable
351        newTime = currTime + datetime.timedelta(minutes=5)
352        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, allowedTimePeriodFromHour=currTime.hour,
353                                 allowedTimePeriodFromMin=currTime.minute, allowedTimePeriodToHour=newTime.hour, allowedTimePeriodToMin=newTime.minute,
354                                 allowedTimePeriodAbort="false")
355        servs_in = self.servers[self.nodes_init:self.nodes_in + 1]
356        rebalance = self.cluster.async_rebalance([self.master], servs_in, [])
357        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name,
358                                                                     timeout_in_seconds=(self.wait_timeout * 5))
359        rebalance.result()
360        if compact_run:
361            self.log.info("auto compaction run successfully")
362        else:
363            self.fail("auto compaction does not run")
364        remote_client.disconnect()
365
366    def test_database_size_compaction(self):
367        rest = RestConnection(self.master)
368        percent_threshold = self.autocompaction_value * 1048576
369        self.set_auto_compaction(rest, dbFragmentThreshold=percent_threshold)
370        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
371        end_time = time.time() + self.wait_timeout * 5
372        monitor_fragm = self.cluster.async_monitor_disk_size_fragmentation(self.master, percent_threshold, self.default_bucket_name)
373        while monitor_fragm.state != "FINISHED":
374            if end_time < time.time():
375                self.fail("Fragmentation level is not reached in %s sec" % self.wait_timeout * 5)
376            try:
377                monitor_fragm = self.cluster.async_monitor_disk_size_fragmentation(self.master, percent_threshold, self.default_bucket_name)
378                self._load_all_buckets(self.master, self.gen_update, "update", 0)
379                active_tasks = self.cluster.async_monitor_active_task(self.master, "bucket_compaction", "bucket", wait_task=False)
380                for active_task in active_tasks:
381                    result = active_task.result()
382                    self.assertTrue(result)
383                    self.sleep(2)
384            except Exception, ex:
385                self.log.error("Load cannot be performed: %s" % str(ex))
386                self.fail(ex)
387        monitor_fragm.result()
388
389    def test_start_stop_DB_compaction(self):
390        rest = RestConnection(self.master)
391        remote_client = RemoteMachineShellConnection(self.master)
392        self.log.info('loading the buckets')
393        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
394        self.log.info('disabling compaction')
395        self.disable_compaction()
396        self.log.info('monitor db fragmentation')
397        self._monitor_DB_fragmentation()
398        self.log.info('async compact the bucket')
399        compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
400        self.log.info('cancel bucket compaction')
401        self._cancel_bucket_compaction(rest, self.default_bucket_name)
402        #compaction_task.result(self.wait_timeout)
403        self.log.info('compact again')
404        self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
405        self.log.info('waiting for compaction to end')
406        compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name, timeout_in_seconds=self.wait_timeout)
407
408        if compact_run:
409            self.log.info("auto compaction run successfully")
410        else:
411            self.fail("auto compaction does not run")
412        remote_client.disconnect()
413
414
415    # Created for MB-14976 - we need more than 65536 file revisions to trigger this problem.
416
417    def test_large_file_version(self):
418        rest = RestConnection(self.master)
419        remote_client = RemoteMachineShellConnection(self.master)
420        remote_client.extract_remote_info()
421
422        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
423        self.disable_compaction()
424        self._monitor_DB_fragmentation()
425
426        # rename here
427
428        remote_client.stop_couchbase()
429        time.sleep(5)
430        remote_client.execute_command("cd /opt/couchbase/var/lib/couchbase/data/default;rename .1 .65535 *.1")
431        remote_client.execute_command("cd /opt/couchbase/var/lib/couchbase/data/default;rename .2 .65535 *.2")
432        remote_client.start_couchbase()
433
434        for i in range(5):
435            self.log.info("starting a compaction iteration")
436            compaction_task = self.cluster.async_compact_bucket(self.master, self.default_bucket_name)
437
438            compact_run = remote_client.wait_till_compaction_end(rest, self.default_bucket_name, timeout_in_seconds=self.wait_timeout)
439            res = compaction_task.result(self.wait_timeout)
440
441
442        if compact_run:
443            self.log.info("auto compaction run successfully")
444        else:
445            self.fail("auto compaction does not run")
446
447        remote_client.disconnect()
448
449    def test_start_stop_auto_DB_compaction(self):
450        threads = []
451        rest = RestConnection(self.master)
452        self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value)
453        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
454        threads.append(Thread(target=self._monitor_DB_fragmentation, name="DB_Thread", args=()))
455        threads.append(Thread(target=self._cancel_bucket_compaction, name="cancel_Thread", args=(rest, self.default_bucket_name,)))
456        for thread in threads:
457            thread.start()
458            self.sleep(2)
459        for thread in threads:
460            thread.join()
461        if self.is_crashed.is_set():
462            self.fail("Error occurred during test run")
463
464
465    def _cancel_bucket_compaction(self, rest, bucket):
466        remote_client = RemoteMachineShellConnection(self.master)
467
468        try:
469            result = self.cluster.cancel_bucket_compaction(self.master, bucket)
470            self.assertTrue(result)
471            remote_client.wait_till_compaction_end(rest, self.default_bucket_name, self.wait_timeout)
472            compaction_running = False
473        except Exception, ex:
474            self.is_crashed.set()
475            self.log.error("Compaction cannot be cancelled: %s" % str(ex))
476        remote_client.disconnect()
477
478
479    def test_auto_compaction_with_multiple_buckets(self):
480        remote_client = RemoteMachineShellConnection(self.master)
481        rest = RestConnection(self.master)
482        for bucket in self.buckets:
483            if bucket.name == "default":
484                self.disable_compaction()
485            else:
486                self.set_auto_compaction(rest, dbFragmentThresholdPercentage=self.autocompaction_value, bucket=bucket.name)
487        self._load_all_buckets(self.master, self.gen_load, "create", 0, 1)
488        end_time = time.time() + self.wait_timeout * 30
489        for bucket in self.buckets:
490            monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, self.autocompaction_value, bucket.name)
491            while monitor_fragm.state != "FINISHED":
492                if end_time < time.time():
493                    self.fail("Fragmentation level is not reached in %s sec" % self.wait_timeout * 30)
494                try:
495                    self._load_all_buckets(self.servers[0], self.gen_update, "update", 0)
496                except Exception, ex:
497                    self.log.error("Load cannot be performed: %s" % str(ex))
498                    self.fail(ex)
499            monitor_fragm.result()
500            compact_run = remote_client.wait_till_compaction_end(rest, bucket.name,
501                                                                     timeout_in_seconds=(self.wait_timeout * 5))
502            if compact_run:
503                self.log.info("auto compaction run successfully")
504        remote_client.disconnect()
505
506    def _monitor_DB_fragmentation(self):
507        monitor_fragm = self.cluster.async_monitor_db_fragmentation(self.master, self.autocompaction_value, self.default_bucket_name)
508        end_time = time.time() + self.wait_timeout * 30
509        while monitor_fragm.state != "FINISHED":
510            if end_time < time.time():
511                self.fail("Fragmentation level is not reached in %s sec" % self.wait_timeout * 30)
512            try:
513                self._load_all_buckets(self.master, self.gen_update, "update", 0)
514            except Exception, ex:
515                self.is_crashed.set()
516                self.log.error("Load cannot be performed: %s" % str(ex))
517                self.fail(ex)
518        result = monitor_fragm.result()
519        if not result:
520            self.is_crashed.set()
521        self.assertTrue(result, "Fragmentation level is not reached")
522