18a2e0070SKeith Batten#
2de6976f2SBenjamin Young# Copyright 2012, Couchbase, Inc.
38a2e0070SKeith Batten# All Rights Reserved
48a2e0070SKeith Batten#
58a2e0070SKeith Batten# Licensed under the Apache License, Version 2.0 (the "License")
68a2e0070SKeith Batten# you may not use this file except in compliance with the License.
78a2e0070SKeith Batten# You may obtain a copy of the License at
88a2e0070SKeith Batten#
98a2e0070SKeith Batten#     http://www.apache.org/licenses/LICENSE-2.0
108a2e0070SKeith Batten#
118a2e0070SKeith Batten# Unless required by applicable law or agreed to in writing, software
128a2e0070SKeith Batten# distributed under the License is distributed on an "AS IS" BASIS,
138a2e0070SKeith Batten# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
148a2e0070SKeith Batten# See the License for the specific language governing permissions and
158a2e0070SKeith Batten# limitations under the License.
168a2e0070SKeith Batten#
178a2e0070SKeith Batten
188a2e0070SKeith Battenimport uuid
20d94edbbbSMike Wiederhold    import json
22d94edbbbSMike Wiederhold    import simplejson as json
238a2e0070SKeith Battenimport time
248a2e0070SKeith Battenfrom copy import deepcopy
254ee7e1b1SKeith Battenfrom threading import Thread, Lock
268a2e0070SKeith Battenimport urllib
27425e15e5SBenjamin Youngimport warnings
288a2e0070SKeith Battenimport logging
298a2e0070SKeith Batten
308a2e0070SKeith Battenfrom rest_client import RestConnection
31e2df5e5fSMike Wiederholdfrom couchbaseclient import CouchbaseClient
328a2e0070SKeith Batten
33425e15e5SBenjamin Young
34425e15e5SBenjamin Youngclass Couchbase(object):
358a2e0070SKeith Batten    def __init__(self, host, username, password):
368fca23d3SBenjamin Young        if (':' in host):
378fca23d3SBenjamin Young            [ip, port] = host.split(':')
388fca23d3SBenjamin Young        else:
398fca23d3SBenjamin Young            [ip, port] = host, 8091
408fca23d3SBenjamin Young
41d94edbbbSMike Wiederhold        server = {'ip': ip,
42d94edbbbSMike Wiederhold                  'port': port,
43d94edbbbSMike Wiederhold                  'username': username,
44d94edbbbSMike Wiederhold                  'password': password
454ee7e1b1SKeith Batten                  }
468a2e0070SKeith Batten
478a2e0070SKeith Batten        self.servers = [server]
484ee7e1b1SKeith Batten        self.servers_lock = Lock()
498a2e0070SKeith Batten
508a2e0070SKeith Batten        self.rest_username = username
518a2e0070SKeith Batten        self.rest_password = password
528a2e0070SKeith Batten
53d94edbbbSMike Wiederhold        server_config_uri = "http://%s:%s/pools/default" % (server['ip'],
54d94edbbbSMike Wiederhold                                                            server['port'])
55d94edbbbSMike Wiederhold        config = ServerHelper.parse_server_config(server_config_uri, username,
56d94edbbbSMike Wiederhold                                                  password)
57a2cdad2cSjzablocki        #couchApiBase will not be in node config before Couchbase Server 2.0
58920c82f1SPavel.Paulau        try:
59920c82f1SPavel.Paulau            self.couch_api_base = config["nodes"][0].get("couchApiBase")
60920c82f1SPavel.Paulau        except TypeError:
61920c82f1SPavel.Paulau            self.couch_api_base = "http://%s:8092/" % server['ip']
63d94edbbbSMike Wiederhold        self.streaming_thread = Thread(name="streaming",
64d94edbbbSMike Wiederhold                                       target=self._start_streaming, args=())
658a2e0070SKeith Batten        self.streaming_thread.daemon = True
668a2e0070SKeith Batten        self.streaming_thread.start()
678a2e0070SKeith Batten
688a2e0070SKeith Batten    def _start_streaming(self):
698a2e0070SKeith Batten        # this will dynamically update servers
70561e3537SKeith Batten        urlopener = urllib.FancyURLopener()
719750242aSBin Cui        urlopener.prompt_user_passwd = lambda host, realm: (self.rest_username,
729750242aSBin Cui                                                            self.rest_password)
734ee7e1b1SKeith Batten        current_servers = True
744ee7e1b1SKeith Batten        while current_servers:
754ee7e1b1SKeith Batten            self.servers_lock.acquire()
768a2e0070SKeith Batten            current_servers = deepcopy(self.servers)
774ee7e1b1SKeith Batten            self.servers_lock.release()
788a2e0070SKeith Batten            for server in current_servers:
79d94edbbbSMike Wiederhold                url = "http://%s:%s/poolsStreaming/default" % (server["ip"],
80d94edbbbSMike Wiederhold                                                               server["port"])
81561e3537SKeith Batten                f = urlopener.open(url)
828a2e0070SKeith Batten                while f:
838a2e0070SKeith Batten                    try:
848a2e0070SKeith Batten                        d = f.readline()
858a2e0070SKeith Batten                        if not d:
868a2e0070SKeith Batten                            # try next server if we get an EOF
878a2e0070SKeith Batten                            f.close()
888a2e0070SKeith Batten                            break
898a2e0070SKeith Batten                    except:
908a2e0070SKeith Batten                        # try next server if we fail to read
918a2e0070SKeith Batten                        f.close()
928a2e0070SKeith Batten                        break
938a2e0070SKeith Batten                    try:
948a2e0070SKeith Batten                        data = json.loads(d)
958a2e0070SKeith Batten                    except:
968a2e0070SKeith Batten                        continue
978a2e0070SKeith Batten
988a2e0070SKeith Batten                    new_servers = []
998a2e0070SKeith Batten                    nodes = data["nodes"]
1008a2e0070SKeith Batten                    for node in nodes:
101d94edbbbSMike Wiederhold                        if (node["clusterMembership"] == "active" and
102d94edbbbSMike Wiederhold                            node["status"] == "healthy"):
103d94edbbbSMike Wiederhold                            ip, port = node["hostname"].split(":")
104a2cdad2cSjzablocki                            couch_api_base = node.get("couchApiBase")
105d94edbbbSMike Wiederhold                            new_servers.append({"ip": ip,
106d94edbbbSMike Wiederhold                                                "port": port,
107d94edbbbSMike Wiederhold                                                "username": self.rest_username,
108d94edbbbSMike Wiederhold                                                "password": self.rest_password,
109d94edbbbSMike Wiederhold                                                "couchApiBase": couch_api_base
1108a2e0070SKeith Batten                                                })
111006c1aa8SBin Cui                    if new_servers:
112006c1aa8SBin Cui                        new_servers.sort()
113006c1aa8SBin Cui                        self.servers_lock.acquire()
114006c1aa8SBin Cui                        self.servers = deepcopy(new_servers)
115006c1aa8SBin Cui                        self.servers_lock.release()
1164ee7e1b1SKeith Batten
1178a2e0070SKeith Batten    def bucket(self, bucket_name):
1188a2e0070SKeith Batten        return Bucket(bucket_name, self)
1198a2e0070SKeith Batten
1208a2e0070SKeith Batten    def buckets(self):
1218a2e0070SKeith Batten        """Get a list of all buckets as Buckets"""
1228a2e0070SKeith Batten        rest = self._rest()
1238a2e0070SKeith Batten        buckets = []
124de450958SBenjamin Young        for rest_bucket in rest.get_buckets():
1258a2e0070SKeith Batten            buckets.append(Bucket(rest_bucket.name, self))
1268a2e0070SKeith Batten        return buckets
1278a2e0070SKeith Batten
128d94edbbbSMike Wiederhold    def create(self, bucket_name, bucket_password='', ram_quota_mb=100,
129d94edbbbSMike Wiederhold               replica=0):
1308a2e0070SKeith Batten        rest = self._rest()
1318a2e0070SKeith Batten        rest.create_bucket(bucket=bucket_name,
1328a2e0070SKeith Batten                           ramQuotaMB=ram_quota_mb,
1338a2e0070SKeith Batten                           authType='sasl',
1348a2e0070SKeith Batten                           saslPassword=bucket_password,
1358a2e0070SKeith Batten                           replicaNumber=replica,
1368a2e0070SKeith Batten                           bucketType='membase')
1378a2e0070SKeith Batten        ip, port, _, _ = self._rest_info()
1388a2e0070SKeith Batten
1398a2e0070SKeith Batten        while True:
1408a2e0070SKeith Batten            try:
1418a2e0070SKeith Batten                content = '{"basicStats":{"quotaPercentUsed":0.0}}'
142d94edbbbSMike Wiederhold                formatter_uri = "http://%s:%s/pools/default/buckets/%s"
143d94edbbbSMike Wiederhold                status, content = rest._http_request(formatter_uri %
144d94edbbbSMike Wiederhold                                                     (ip, port, bucket_name),
145d94edbbbSMike Wiederhold                                                     method='GET', params='',
146d94edbbbSMike Wiederhold                                                     headers=None, timeout=120)
1478a2e0070SKeith Batten            except ValueError:
1488a2e0070SKeith Batten                pass
1498a2e0070SKeith Batten            if json.loads(content)['basicStats']['quotaPercentUsed'] > 0.0:
1508a2e0070SKeith Batten                time.sleep(2)
1518a2e0070SKeith Batten                break
1528a2e0070SKeith Batten            time.sleep(1)
1538a2e0070SKeith Batten
1548a2e0070SKeith Batten        return Bucket(bucket_name, self)
1558a2e0070SKeith Batten
1568a2e0070SKeith Batten    def delete(self, bucket_name):
1578a2e0070SKeith Batten        rest = self._rest()
1588a2e0070SKeith Batten        rest.delete_bucket(bucket_name)
1598a2e0070SKeith Batten
1608a2e0070SKeith Batten    def __getitem__(self, key):
1618a2e0070SKeith Batten        return self.bucket(key)
1628a2e0070SKeith Batten
1638a2e0070SKeith Batten    def __iter__(self):
1648a2e0070SKeith Batten        return BucketIterator(self.buckets())
1658a2e0070SKeith Batten
1668a2e0070SKeith Batten    def _rest(self):
1674ee7e1b1SKeith Batten        self.servers_lock.acquire()
1684ee7e1b1SKeith Batten        server_info = deepcopy(self.servers[0])
1694ee7e1b1SKeith Batten        self.servers_lock.release()
1708a2e0070SKeith Batten        server_info['username'] = self.rest_username
1718a2e0070SKeith Batten        server_info['password'] = self.rest_password
172a2cdad2cSjzablocki        server_info['couchApiBase'] = self.couch_api_base
1738a2e0070SKeith Batten        rest = RestConnection(server_info)
1748a2e0070SKeith Batten        return rest
1758a2e0070SKeith Batten
1768a2e0070SKeith Batten    def _rest_info(self):
1774ee7e1b1SKeith Batten        self.servers_lock.acquire()
1784ee7e1b1SKeith Batten        server_info = deepcopy(self.servers[0])
1794ee7e1b1SKeith Batten        self.servers_lock.release()
180d94edbbbSMike Wiederhold        return (server_info['ip'], server_info['port'],
181d94edbbbSMike Wiederhold                server_info['username'], server_info['password'])
1828a2e0070SKeith Batten
1838a2e0070SKeith Batten
184425e15e5SBenjamin Youngclass Server(Couchbase):
185425e15e5SBenjamin Young    def __init__(self, host, username, password):
186d94edbbbSMike Wiederhold        warnings.warn("Server is deprecated; use Couchbase instead",
187d94edbbbSMike Wiederhold                      DeprecationWarning)
188425e15e5SBenjamin Young        Couchbase.__init__(self, host, username, password)
189425e15e5SBenjamin Young
1908a2e0070SKeith Batten
1918a2e0070SKeith Battenclass BucketIterator(object):
1928a2e0070SKeith Batten    def __init__(self, buckets):
1938a2e0070SKeith Batten        self.buckets = buckets
1948a2e0070SKeith Batten
1958a2e0070SKeith Batten    def __iter__(self):
1968a2e0070SKeith Batten        return self
1978a2e0070SKeith Batten
1988a2e0070SKeith Batten    def next(self):
1998a2e0070SKeith Batten        try:
2008a2e0070SKeith Batten            return self.buckets.pop(0)
2018a2e0070SKeith Batten        except IndexError:
2028a2e0070SKeith Batten            raise StopIteration
2038a2e0070SKeith Batten
2048a2e0070SKeith Batten
2058a2e0070SKeith Battenclass Bucket(object):
2068a2e0070SKeith Batten    def __init__(self, bucket_name, server):
2078a2e0070SKeith Batten        self.server = server
2088a2e0070SKeith Batten
2098a2e0070SKeith Batten        self.bucket_name = bucket_name
2108a2e0070SKeith Batten        rest = server._rest()
2118a2e0070SKeith Batten        self.bucket_password = rest.get_bucket(bucket_name).saslPassword
2128a2e0070SKeith Batten
2138a2e0070SKeith Batten        ip, port, rest_username, rest_password = server._rest_info()
214d94edbbbSMike Wiederhold        formatter_uri = "http://%s:%s/pools/default"
215e2df5e5fSMike Wiederhold        self.mc_client = CouchbaseClient(formatter_uri % (ip, port),
216e2df5e5fSMike Wiederhold                                         self.bucket_name,
217e2df5e5fSMike Wiederhold                                         self.bucket_password)
2188a2e0070SKeith Batten
2198a2e0070SKeith Batten    def append(self, key, value, cas=0):
2208a2e0070SKeith Batten        return self.mc_client.append(key, value, cas)
2218a2e0070SKeith Batten
2228a2e0070SKeith Batten    def prepend(self, key, value, cas=0):
2238a2e0070SKeith Batten        return self.mc_client.prepend(key, value, cas)
2248a2e0070SKeith Batten
2258a2e0070SKeith Batten    def incr(self, key, amt=1, init=0, exp=0):
2268a2e0070SKeith Batten        return self.mc_client.incr(key, amt, init, exp)
2278a2e0070SKeith Batten
2288a2e0070SKeith Batten    def decr(self, key, amt=1, init=0, exp=0):
2298a2e0070SKeith Batten        return self.mc_client.decr(key, amt, init, exp)
2308a2e0070SKeith Batten
2318a2e0070SKeith Batten    def set(self, key, expiration, flags, value):
2328a2e0070SKeith Batten        self.mc_client.set(key, expiration, flags, value)
2338a2e0070SKeith Batten
2348a2e0070SKeith Batten    def add(self, key, exp, flags, val):
2358a2e0070SKeith Batten        return self.mc_client.add(key, exp, flags, val)
2368a2e0070SKeith Batten
2378a2e0070SKeith Batten    def replace(self, key, exp, flags, val):
2388a2e0070SKeith Batten        return self.mc_client.replace(key, exp, flags, val)
2398a2e0070SKeith Batten
2408a2e0070SKeith Batten    def get(self, key):
2418a2e0070SKeith Batten        return self.mc_client.get(key)
2428a2e0070SKeith Batten
2438a2e0070SKeith Batten    def send_get(self, key):
2448a2e0070SKeith Batten        return self.mc_client.send_get(key)
2458a2e0070SKeith Batten
2468a2e0070SKeith Batten    def getl(self, key, exp=15):
2478a2e0070SKeith Batten        return self.mc_client.getl(key, exp)
2488a2e0070SKeith Batten
2498a2e0070SKeith Batten    def cas(self, key, exp, flags, oldVal, val):
2508a2e0070SKeith Batten        return self.mc_client.cas(key, exp, flags, oldVal, val)
2518a2e0070SKeith Batten
2528a2e0070SKeith Batten    def touch(self, key, exp):
2538a2e0070SKeith Batten        return self.mc_client.touch(key, exp)
2548a2e0070SKeith Batten
2558a2e0070SKeith Batten    def gat(self, key, exp):
2568a2e0070SKeith Batten        return self.mc_client.gat(key, exp)
2578a2e0070SKeith Batten
2588a2e0070SKeith Batten    def getMulti(self, keys):
2598a2e0070SKeith Batten        return self.mc_client.getMulti(keys)
2608a2e0070SKeith Batten
2618a2e0070SKeith Batten    def stats(self, sub=''):
2628a2e0070SKeith Batten        return self.mc_client.stats(sub)
2638a2e0070SKeith Batten
2648a2e0070SKeith Batten    def delete(self, key, cas=0):
265692108c0SKeith Batten        if key.startswith('_design/'):
266692108c0SKeith Batten            # this is a design doc, we need to handle it differently
267692108c0SKeith Batten            view = key.split('/')[1]
268692108c0SKeith Batten
269692108c0SKeith Batten            rest = self.server._rest()
270692108c0SKeith Batten            rest.delete_view(self.bucket_name, view)
271692108c0SKeith Batten        else:
272692108c0SKeith Batten            return self.mc_client.delete(key, cas)
2738a2e0070SKeith Batten
2748a2e0070SKeith Batten    def save(self, document):
2758a2e0070SKeith Batten        value = deepcopy(document)
2768a2e0070SKeith Batten        if '_id' in value:
2778a2e0070SKeith Batten            key = value['_id']
2788a2e0070SKeith Batten            del value['_id']
2798a2e0070SKeith Batten        else:
2808a2e0070SKeith Batten            key = str(uuid.uuid4())
2818a2e0070SKeith Batten        if '$flags' in value:
2828a2e0070SKeith Batten            flags = value['$flags']
2838a2e0070SKeith Batten            del value['$flags']
2848a2e0070SKeith Batten        else:
2858a2e0070SKeith Batten            flags = 0
2868a2e0070SKeith Batten        if '$expiration' in value:
28723795b3bSKeith Batten            expiration = value['$expiration']
2888a2e0070SKeith Batten            del value['$expiration']
2898a2e0070SKeith Batten        else:
2908a2e0070SKeith Batten            expiration = 0
2918a2e0070SKeith Batten
2928a2e0070SKeith Batten        if key.startswith('_design/'):
2938a2e0070SKeith Batten            # this is a design doc, we need to handle it differently
2948a2e0070SKeith Batten            view = key.split('/')[1]
2958a2e0070SKeith Batten
2968a2e0070SKeith Batten            rest = self.server._rest()
2978d4c79e1SPavel.Paulau            rest.create_design_doc(self.bucket_name, view, json.dumps(value))
2988a2e0070SKeith Batten        else:
2995382ab48SKeith Batten            if '_rev' in value:
300d94edbbbSMike Wiederhold                # couchbase works in clobber mode so for "set" _rev is useless
3015382ab48SKeith Batten                del value['_rev']
3028a2e0070SKeith Batten            self.set(key, expiration, flags, json.dumps(value))
3038a2e0070SKeith Batten
3048a2e0070SKeith Batten        return key
3058a2e0070SKeith Batten
3068a2e0070SKeith Batten    def __setitem__(self, key, value):
3078a2e0070SKeith Batten        if isinstance(value, dict):
3088a2e0070SKeith Batten            self.set(key, value['expiration'], value['flags'], value['value'])
3098a2e0070SKeith Batten        else:
3108a2e0070SKeith Batten            self.set(key, 0, 0, value)
3118a2e0070SKeith Batten
3128a2e0070SKeith Batten    def __getitem__(self, key):
3138a2e0070SKeith Batten        return self.get(key)
3148a2e0070SKeith Batten
3158a2e0070SKeith Batten    def view(self, view, **options):
3168a2e0070SKeith Batten        params = deepcopy(options)
3178a2e0070SKeith Batten        limit = None
3188a2e0070SKeith Batten        if 'limit' in params:
3198a2e0070SKeith Batten            limit = params['limit']
3208a2e0070SKeith Batten            del params['limit']
3218a2e0070SKeith Batten
32277d68295SKeith Batten        if view.startswith("_design/"):
32377d68295SKeith Batten            view_s = view.split('/')
32477d68295SKeith Batten            view_doc = view_s[1]
32577d68295SKeith Batten            view_map = view_s[3]
32677d68295SKeith Batten        else:
32777d68295SKeith Batten            view_doc = view
32877d68295SKeith Batten            view_map = None
3298a2e0070SKeith Batten
3308a2e0070SKeith Batten        rest = self.server._rest()
332d94edbbbSMike Wiederhold        results = rest.view_results(self.bucket_name, view_doc, view_map,
333d94edbbbSMike Wiederhold                                    params, limit)
3348a2e0070SKeith Batten        if 'rows' in results:
3358a2e0070SKeith Batten            return results['rows']
3368a2e0070SKeith Batten        else:
3378a2e0070SKeith Batten            return None
339d94edbbbSMike Wiederhold
340a2cdad2cSjzablockiclass ServerHelper(object):
341a2cdad2cSjzablocki    @staticmethod
342ded95566SBenjamin Young    def parse_server_config(uri, username="", password=""):
343a2cdad2cSjzablocki        urlopener = urllib.FancyURLopener()
344baf92406SBin Cui        if username and len(username) > 0 and password and len(password) > 0:
3459750242aSBin Cui            urlopener.prompt_user_passwd = lambda host, realm: (username, password)
346a2cdad2cSjzablocki        response = urlopener.open(uri)
348a2cdad2cSjzablocki        try:
349a2cdad2cSjzablocki            line = response.readline()
350a2cdad2cSjzablocki            data = json.loads(line)
351a2cdad2cSjzablocki            return data
352a2cdad2cSjzablocki        except:
353d94edbbbSMike Wiederhold            raise Exception("unexpected error - unable to parse server config"
354d94edbbbSMike Wiederhold                            " at %s" % (uri))