1#
2# Copyright 2012, Couchbase, Inc.
3# All Rights Reserved
4#
5# Licensed under the Apache License, Version 2.0 (the "License")
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16#
17
18import uuid
19try:
20    import json
21except:
22    import simplejson as json
23import time
24from copy import deepcopy
25from threading import Thread, Lock
26import urllib
27import warnings
28import logging
29
30from rest_client import RestConnection
31from couchbaseclient import CouchbaseClient
32
33
34class Couchbase(object):
35    def __init__(self, host, username, password):
36        if (':' in host):
37            [ip, port] = host.split(':')
38        else:
39            [ip, port] = host, 8091
40
41        server = {'ip': ip,
42                  'port': port,
43                  'username': username,
44                  'password': password
45                  }
46
47        self.servers = [server]
48        self.servers_lock = Lock()
49
50        self.rest_username = username
51        self.rest_password = password
52
53        server_config_uri = "http://%s:%s/pools/default" % (server['ip'],
54                                                            server['port'])
55        config = ServerHelper.parse_server_config(server_config_uri, username,
56                                                  password)
57        #couchApiBase will not be in node config before Couchbase Server 2.0
58        try:
59            self.couch_api_base = config["nodes"][0].get("couchApiBase")
60        except TypeError:
61            self.couch_api_base = "http://%s:8092/" % server['ip']
62
63        self.streaming_thread = Thread(name="streaming",
64                                       target=self._start_streaming, args=())
65        self.streaming_thread.daemon = True
66        self.streaming_thread.start()
67
68    def _start_streaming(self):
69        # this will dynamically update servers
70        urlopener = urllib.FancyURLopener()
71        urlopener.prompt_user_passwd = lambda host, realm: (self.rest_username,
72                                                            self.rest_password)
73        current_servers = True
74        while current_servers:
75            self.servers_lock.acquire()
76            current_servers = deepcopy(self.servers)
77            self.servers_lock.release()
78            for server in current_servers:
79                url = "http://%s:%s/poolsStreaming/default" % (server["ip"],
80                                                               server["port"])
81                f = urlopener.open(url)
82                while f:
83                    try:
84                        d = f.readline()
85                        if not d:
86                            # try next server if we get an EOF
87                            f.close()
88                            break
89                    except:
90                        # try next server if we fail to read
91                        f.close()
92                        break
93                    try:
94                        data = json.loads(d)
95                    except:
96                        continue
97
98                    new_servers = []
99                    nodes = data["nodes"]
100                    for node in nodes:
101                        if (node["clusterMembership"] == "active" and
102                            node["status"] == "healthy"):
103                            ip, port = node["hostname"].split(":")
104                            couch_api_base = node.get("couchApiBase")
105                            new_servers.append({"ip": ip,
106                                                "port": port,
107                                                "username": self.rest_username,
108                                                "password": self.rest_password,
109                                                "couchApiBase": couch_api_base
110                                                })
111                    if new_servers:
112                        new_servers.sort()
113                        self.servers_lock.acquire()
114                        self.servers = deepcopy(new_servers)
115                        self.servers_lock.release()
116
117    def bucket(self, bucket_name):
118        return Bucket(bucket_name, self)
119
120    def buckets(self):
121        """Get a list of all buckets as Buckets"""
122        rest = self._rest()
123        buckets = []
124        for rest_bucket in rest.get_buckets():
125            buckets.append(Bucket(rest_bucket.name, self))
126        return buckets
127
128    def create(self, bucket_name, bucket_password='', ram_quota_mb=100,
129               replica=0):
130        rest = self._rest()
131        rest.create_bucket(bucket=bucket_name,
132                           ramQuotaMB=ram_quota_mb,
133                           authType='sasl',
134                           saslPassword=bucket_password,
135                           replicaNumber=replica,
136                           bucketType='membase')
137        ip, port, _, _ = self._rest_info()
138
139        while True:
140            try:
141                content = '{"basicStats":{"quotaPercentUsed":0.0}}'
142                formatter_uri = "http://%s:%s/pools/default/buckets/%s"
143                status, content = rest._http_request(formatter_uri %
144                                                     (ip, port, bucket_name),
145                                                     method='GET', params='',
146                                                     headers=None, timeout=120)
147            except ValueError:
148                pass
149            if json.loads(content)['basicStats']['quotaPercentUsed'] > 0.0:
150                time.sleep(2)
151                break
152            time.sleep(1)
153
154        return Bucket(bucket_name, self)
155
156    def delete(self, bucket_name):
157        rest = self._rest()
158        rest.delete_bucket(bucket_name)
159
160    def __getitem__(self, key):
161        return self.bucket(key)
162
163    def __iter__(self):
164        return BucketIterator(self.buckets())
165
166    def _rest(self):
167        self.servers_lock.acquire()
168        server_info = deepcopy(self.servers[0])
169        self.servers_lock.release()
170        server_info['username'] = self.rest_username
171        server_info['password'] = self.rest_password
172        server_info['couchApiBase'] = self.couch_api_base
173        rest = RestConnection(server_info)
174        return rest
175
176    def _rest_info(self):
177        self.servers_lock.acquire()
178        server_info = deepcopy(self.servers[0])
179        self.servers_lock.release()
180        return (server_info['ip'], server_info['port'],
181                server_info['username'], server_info['password'])
182
183
184class Server(Couchbase):
185    def __init__(self, host, username, password):
186        warnings.warn("Server is deprecated; use Couchbase instead",
187                      DeprecationWarning)
188        Couchbase.__init__(self, host, username, password)
189
190
191class BucketIterator(object):
192    def __init__(self, buckets):
193        self.buckets = buckets
194
195    def __iter__(self):
196        return self
197
198    def next(self):
199        try:
200            return self.buckets.pop(0)
201        except IndexError:
202            raise StopIteration
203
204
205class Bucket(object):
206    def __init__(self, bucket_name, server):
207        self.server = server
208
209        self.bucket_name = bucket_name
210        rest = server._rest()
211        self.bucket_password = rest.get_bucket(bucket_name).saslPassword
212
213        ip, port, rest_username, rest_password = server._rest_info()
214        formatter_uri = "http://%s:%s/pools/default"
215        self.mc_client = CouchbaseClient(formatter_uri % (ip, port),
216                                         self.bucket_name,
217                                         self.bucket_password)
218
219    def append(self, key, value, cas=0):
220        return self.mc_client.append(key, value, cas)
221
222    def prepend(self, key, value, cas=0):
223        return self.mc_client.prepend(key, value, cas)
224
225    def incr(self, key, amt=1, init=0, exp=0):
226        return self.mc_client.incr(key, amt, init, exp)
227
228    def decr(self, key, amt=1, init=0, exp=0):
229        return self.mc_client.decr(key, amt, init, exp)
230
231    def set(self, key, expiration, flags, value):
232        self.mc_client.set(key, expiration, flags, value)
233
234    def add(self, key, exp, flags, val):
235        return self.mc_client.add(key, exp, flags, val)
236
237    def replace(self, key, exp, flags, val):
238        return self.mc_client.replace(key, exp, flags, val)
239
240    def get(self, key):
241        return self.mc_client.get(key)
242
243    def send_get(self, key):
244        return self.mc_client.send_get(key)
245
246    def getl(self, key, exp=15):
247        return self.mc_client.getl(key, exp)
248
249    def cas(self, key, exp, flags, oldVal, val):
250        return self.mc_client.cas(key, exp, flags, oldVal, val)
251
252    def touch(self, key, exp):
253        return self.mc_client.touch(key, exp)
254
255    def gat(self, key, exp):
256        return self.mc_client.gat(key, exp)
257
258    def getMulti(self, keys):
259        return self.mc_client.getMulti(keys)
260
261    def stats(self, sub=''):
262        return self.mc_client.stats(sub)
263
264    def delete(self, key, cas=0):
265        if key.startswith('_design/'):
266            # this is a design doc, we need to handle it differently
267            view = key.split('/')[1]
268
269            rest = self.server._rest()
270            rest.delete_view(self.bucket_name, view)
271        else:
272            return self.mc_client.delete(key, cas)
273
274    def save(self, document):
275        value = deepcopy(document)
276        if '_id' in value:
277            key = value['_id']
278            del value['_id']
279        else:
280            key = str(uuid.uuid4())
281        if '$flags' in value:
282            flags = value['$flags']
283            del value['$flags']
284        else:
285            flags = 0
286        if '$expiration' in value:
287            expiration = value['$expiration']
288            del value['$expiration']
289        else:
290            expiration = 0
291
292        if key.startswith('_design/'):
293            # this is a design doc, we need to handle it differently
294            view = key.split('/')[1]
295
296            rest = self.server._rest()
297            rest.create_design_doc(self.bucket_name, view, json.dumps(value))
298        else:
299            if '_rev' in value:
300                # couchbase works in clobber mode so for "set" _rev is useless
301                del value['_rev']
302            self.set(key, expiration, flags, json.dumps(value))
303
304        return key
305
306    def __setitem__(self, key, value):
307        if isinstance(value, dict):
308            self.set(key, value['expiration'], value['flags'], value['value'])
309        else:
310            self.set(key, 0, 0, value)
311
312    def __getitem__(self, key):
313        return self.get(key)
314
315    def view(self, view, **options):
316        params = deepcopy(options)
317        limit = None
318        if 'limit' in params:
319            limit = params['limit']
320            del params['limit']
321
322        if view.startswith("_design/"):
323            view_s = view.split('/')
324            view_doc = view_s[1]
325            view_map = view_s[3]
326        else:
327            view_doc = view
328            view_map = None
329
330        rest = self.server._rest()
331
332        results = rest.view_results(self.bucket_name, view_doc, view_map,
333                                    params, limit)
334        if 'rows' in results:
335            return results['rows']
336        else:
337            return None
338
339
340class ServerHelper(object):
341    @staticmethod
342    def parse_server_config(uri, username="", password=""):
343        urlopener = urllib.FancyURLopener()
344        if username and len(username) > 0 and password and len(password) > 0:
345            urlopener.prompt_user_passwd = lambda host, realm: (username, password)
346        response = urlopener.open(uri)
347
348        try:
349            line = response.readline()
350            data = json.loads(line)
351            return data
352        except:
353            raise Exception("unexpected error - unable to parse server config"
354                            " at %s" % (uri))
355