1import json
2import time
3import pylibmc
4import hashlib
5from celery.utils.log import get_task_logger
6import testcfg as cfg
7
8logger = get_task_logger(__name__)
9
10class Cache(object):
11    def __init__(self):
12        self._mc = pylibmc.Client([cfg.OBJECT_CACHE_IP+":"+cfg.OBJECT_CACHE_PORT], binary=True)
13        self.logger = get_task_logger(__name__)
14
15    def store(self, key, data, collectionKey):
16        if isinstance(data, dict):
17            data = json.dumps(data)
18
19        self._mc[key] = data
20
21        # update collection index
22        keyList = self.retrieve(collectionKey)
23        if keyList is None:
24            keyList = [key]
25        elif key not in keyList:
26            keyList.append(key)
27
28        self._mc[collectionKey] = keyList
29
30    def fetchCollection(self, collectionKey):
31        keys = self.retrieve(collectionKey)
32        data = []
33        if keys is not None:
34            for key in keys:
35                val = self.retrieve(key)
36                if val is not None:
37                    data.append(val)
38        return data
39
40    def retrieve(self, key, retry_count = 5):
41        data = None
42
43        try:
44            data = self._mc[key]
45        except KeyError as ex:
46            self.logger.info("fail attempt to retrieve key %s" % ex)
47        except (ValueError, SyntaxError) as ex:
48            pass # non json
49        except Exception as ex:
50            if retry_count > 0:
51                self.logger.info("error occured in mc protocol fetching %s: %s" % (key,ex))
52                self.logger.info("retry attempt: %s" % retry_count)
53                time.sleep(2)
54                cnt = retry_count - 1
55                self.retrieve(key, cnt)
56
57        return data
58
59    def delete(self, key, collectionKey):
60        del self._mc[key]
61
62        # remove index
63        keys = self.retrieve(collectionKey)
64        data = []
65        try:
66            idx = keys.index(key)
67            del keys[idx]
68            self._mc[collectionKey] = keys
69        except ValueError:
70            pass
71
72    def clear(self, collectionKey):
73        keyList = self.retrieve(collectionKey)
74        if keyList is not None:
75            for key in keyList:
76                if key is not None:
77                    try:
78                        del self._mc[key]
79                    except KeyError as ex:
80                        self.logger.error("error clearing key %s"  % ex)
81        try:
82            del self._mc[collectionKey]
83        except KeyError:
84            pass # index already deleted
85
86
87class ObjCacher(Cache):
88
89    def allinstances(self, cachekey):
90        return self.fetchCollection(cachekey)
91
92    def store(self, cachekey, obj):
93        id_ = getContextKey(cachekey, obj.id)
94        super(ObjCacher, self).store(id_, obj, cachekey)
95
96    def instance(self, cachekey, id_):
97        key = getContextKey(cachekey, id_)
98        return self.retrieve(key)
99
100    def delete(self, cachekey, obj):
101        id_ = getContextKey(cachekey, obj.id)
102        super(ObjCacher, self).delete(id_, cachekey)
103
104    def clear(self, cachekey):
105        super(ObjCacher, self).clear(cachekey)
106
107
108class CacheHelper():
109
110    WORKLOADCACHEKEY = "WORKLOADCACHEKEY"
111    TEMPLATECACHEKEY = "TEMPLATECACHEKEY"
112    BUCKETSTATUSCACHEKEY = "BUCKETSTATUSCACHEKEY"
113    QUERYCACHEKEY = "QUERYCACHEKEY"
114    QBUILDCACHEKEY = "QBUILDCACHEKEY"
115    VARCACHEKEY = "VARCACHEKEY"
116    CLUSTERSTATUSKEY = "CLUSTERSTATUSKEY"
117    ACTIVETASKCACHEKEY = "ACTIVETASKCACHEKEY"
118
119    @staticmethod
120    def workloads():
121        return ObjCacher().allinstances(CacheHelper.WORKLOADCACHEKEY)
122
123    @staticmethod
124    def templates():
125        return ObjCacher().allinstances(CacheHelper.TEMPLATECACHEKEY)
126
127    @staticmethod
128    def queries():
129        return ObjCacher().allinstances(CacheHelper.QUERYCACHEKEY)
130
131    @staticmethod
132    def qbuilders():
133        return ObjCacher().allinstances(CacheHelper.QBUILDCACHEKEY)
134
135    @staticmethod
136    def clusterstatus(_id):
137        return ObjCacher().instance(CacheHelper.CLUSTERSTATUSKEY, _id)
138
139    @staticmethod
140    def active_queries():
141        active = []
142        for query in CacheHelper.queries():
143            if query.active:
144                active.append(query)
145        return active
146
147    @staticmethod
148    def cc_queues():
149        queues = []
150        for workload in CacheHelper.workloads():
151            if workload.cc_queues is not None:
152                [queues.append(q) for q in workload.cc_queues]
153        return queues
154
155    @staticmethod
156    def consume_queues():
157        queues = []
158        for workload in CacheHelper.workloads():
159            if workload.consume_queue is not None:
160                queues.append(workload.consume_queue)
161        return queues
162
163    @staticmethod
164    def miss_queues():
165        queues = []
166        for workload in CacheHelper.workloads():
167            if workload.miss_queue is not None and workload.consume_queue is not None:
168                queues.append(workload.consume_queue)
169        return queues
170
171    @staticmethod
172    def task_queues():
173        kv = [workload.task_queue for workload in CacheHelper.workloads()]
174        query = [workload.task_queue for workload in CacheHelper.queries()]
175        return kv + query
176
177    @staticmethod
178    def queues():
179        return CacheHelper.task_queues() +\
180            CacheHelper.cc_queues() +\
181            CacheHelper.consume_queues() +\
182            CacheHelper.miss_queues()
183
184    @staticmethod
185    def cachePhaseVar(key, value):
186        Cache().store(key, value, CacheHelper.VARCACHEKEY)
187
188    @staticmethod
189    def getPhaseVar(key):
190        return Cache().retrieve(key)
191
192    @staticmethod
193    def cacheClean():
194        objCacheKeys = [CacheHelper.WORKLOADCACHEKEY,
195                        CacheHelper.BUCKETSTATUSCACHEKEY,
196                        CacheHelper.QUERYCACHEKEY,
197                        CacheHelper.ACTIVETASKCACHEKEY,
198                        CacheHelper.CLUSTERSTATUSKEY]
199
200        for cacheKey in objCacheKeys:
201            ObjCacher().clear(cacheKey)
202
203
204# ensure no collisions during object caching
205def getContextKey(collectionKey, id_):
206   m = hashlib.md5()
207   m.update(collectionKey)
208   m.update(id_)
209   return m.hexdigest()
210