1import unittest
2from TestInput import TestInputSingleton
3import mc_bin_client
4import uuid
5import logger
6import datetime
7from membase.api.rest_client import RestConnection
8from membase.helper.bucket_helper import BucketOperationHelper
9from membase.helper.cluster_helper import ClusterOperationHelper
10from memcached.helper.data_helper import MemcachedClientHelper
11
12class SimpleSetGetTestBase(object):
13    log = None
14    keys = None
15    servers = None
16    input = None
17    test = None
18
19    def setUp_bucket(self, unittest):
20        self.log = logger.Logger.get_logger()
21        self.input = TestInputSingleton.input
22        unittest.assertTrue(self.input, msg="input parameters missing...")
23        self.test = unittest
24        self.master = self.input.servers[0]
25        rest = RestConnection(self.master)
26        rest.init_cluster(username=self.master.rest_username, password=self.master.rest_password)
27        rest.init_cluster_memoryQuota(memoryQuota=rest.get_nodes_self().mcdMemoryReserved)
28        ClusterOperationHelper.cleanup_cluster([self.master])
29        BucketOperationHelper.delete_all_buckets_or_assert([self.master], self.test)
30
31        serverInfo = self.master
32        rest = RestConnection(serverInfo)
33        info = rest.get_nodes_self()
34        rest.init_cluster(username=serverInfo.rest_username,
35                          password=serverInfo.rest_password)
36        rest.init_cluster_memoryQuota(memoryQuota=info.memoryQuota)
37
38    def set_get_test(self, value_size, number_of_items):
39        fixed_value = MemcachedClientHelper.create_value("S", value_size)
40        specs = [("default", 0),
41                ("set-get-bucket-replica-1", 1),
42                ("set-get-bucket-replica-2", 2),
43                ("set-get-bucket-replica-3", 3)]
44        serverInfo = self.master
45        rest = RestConnection(serverInfo)
46        bucket_ram = int(rest.get_nodes_self().memoryQuota / 4)
47
48        mcport = rest.get_nodes_self().memcached
49        for name, replica in specs:
50            rest.create_bucket(name, bucket_ram, "sasl", "password", replica, mcport)
51
52        bucket_data = {}
53        buckets = RestConnection(serverInfo).get_buckets()
54        for bucket in buckets:
55            bucket_data[bucket.name] = {}
56            ready = BucketOperationHelper.wait_for_memcached(serverInfo, bucket.name)
57            self.test.assertTrue(ready, "wait_for_memcached failed")
58
59            client = MemcachedClientHelper.direct_client(serverInfo, bucket.name)
60            inserted = []
61            rejected = []
62            while len(inserted) <= number_of_items and len(rejected) <= number_of_items:
63                try:
64                    key = str(uuid.uuid4())
65                    client.set(key, 0, 0, fixed_value)
66                    inserted.append(key)
67                except mc_bin_client.MemcachedError:
68                    pass
69
70            retry = 0
71            remaining_items = []
72            remaining_items.extend(inserted)
73            msg = "memcachedError : {0} - unable to get a pre-inserted key : {1}"
74            while retry < 10 and len(remaining_items) > 0:
75                verified_keys = []
76                for key in remaining_items:
77                    try:
78                        flag, keyx, value = client.get(key=key)
79                        if not value == fixed_value:
80                            self.test.fail("value mismatch for key {0}".format(key))
81                        verified_keys.append(key)
82                    except mc_bin_client.MemcachedError as error:
83                        self.log.error(msg.format(error.status, key))
84                    retry += 1
85                [remaining_items.remove(x) for x in verified_keys]
86
87            print_count = 0
88            for key in remaining_items:
89                if print_count > 100:
90                    break
91                print_count += 1
92                self.log.error("unable to verify key : {0}".format(key))
93            if remaining_items:
94                self.test.fail("unable to verify {0} keys".format(len(remaining_items)))
95
96
97    def tearDown_bucket(self):
98        BucketOperationHelper.delete_all_buckets_or_assert([self.master], self.test)
99
100
101class MembaseBucket(unittest.TestCase):
102
103    simpleSetGetTestBase = None
104    def setUp(self):
105        self.simpleSetGetTestBase = SimpleSetGetTestBase()
106        self.simpleSetGetTestBase.setUp_bucket(self)
107        self._log_start()
108
109    def test_value_100b(self):
110        self.simpleSetGetTestBase.set_get_test(128, 4000)
111
112    def test_value_500kb(self):
113        self.simpleSetGetTestBase.set_get_test(512 * 1024, 400)
114
115    def test_value_10mb(self):
116        self.simpleSetGetTestBase.set_get_test(10 * 1024 * 1024, 10)
117
118    def test_value_1mb(self):
119        self.simpleSetGetTestBase.set_get_test(1 * 1024 * 1024, 40)
120
121    def tearDown(self):
122        if self.simpleSetGetTestBase:
123            self.simpleSetGetTestBase.tearDown_bucket()
124        self._log_finish()
125
126    def _log_start(self):
127        try:
128            msg = "{0} : {1} started ".format(datetime.datetime.now(), self._testMethodName)
129            RestConnection(self.servers[0]).log_client_error(msg)
130        except:
131            pass
132
133
134    def _log_finish(self):
135        try:
136            msg = "{0} : {1} finished ".format(datetime.datetime.now(), self._testMethodName)
137            RestConnection(self.servers[0]).log_client_error(msg)
138        except:
139            pass