1#!/usr/bin/env python
2# -*- python -*-
3
4import sys
5import time
6import zipfile
7import os
8import os.path
9import shutil
10import simplejson as json
11import tempfile
12
13from optparse import OptionParser
14
15from couchbase import client
16from couchbase.rest_client import RestConnection, RestHelper
17
18class ZipUtil:
19    def __init__(self, zipobj):
20        self.zipobj = zipobj
21
22    def extractall(self, path=None):
23        if path is None:
24            path = os.getcwd()
25        if (path[-1] in (os.path.sep, os.path.altsep)
26            and len(os.path.splitdrive(path)[1]) > 1):
27            path = path[:-1]
28
29        for member in self.zipobj.namelist():
30            if not isinstance(member, zipfile.ZipInfo):
31                member = self.zipobj.getinfo(member)
32
33            # don't include leading "/" from file name if present
34            if member.filename[0] == '/':
35                targetpath = os.path.join(path, member.filename[1:])
36            else:
37                targetpath = os.path.join(path, member.filename)
38
39            targetpath = os.path.normpath(targetpath)
40
41            # Create all parent directories if necessary.
42            upperdirs = os.path.dirname(targetpath)
43            if upperdirs and not os.path.exists(upperdirs):
44                try:
45                    os.makedirs(upperdirs)
46                except:
47                    print "Unexpected error:", sys.exc_info()[0]
48                    return upperdirs
49
50            if member.filename[-1] == '/':
51                if not os.path.isdir(targetpath):
52                    try:
53                        os.mkdir(targetpath)
54                    except:
55                        print "Fail to create directory:", targetpath
56                continue
57
58            target = file(targetpath, "wb")
59            target.write(self.zipobj.read(member.filename))
60            target.close()
61
62        return path
63
64class DocLoader:
65
66    def parse_args(self):
67        usage = "usage: %prog [options] <directory>|zipfile\n\n" + \
68                "Example: %prog -u Administrator -p password -n 127.0.0.1:8091 " + \
69                "-b mybucket -s 100 gamesim-sample.zip"
70
71        parser = OptionParser(usage)
72
73        username = os.environ.get('REST_USERNAME', None)
74        password = os.environ.get('REST_PASSWORD', None)
75
76        parser.add_option('-u', dest='username', default=username,
77                          help='Username', metavar='Administrator')
78        parser.add_option('-p', dest='password', default=password,
79                          help='Password', metavar='password')
80        parser.add_option('-b', dest='bucket',
81                          help='Bucket', metavar='mybucket')
82        parser.add_option('-n', dest='node', default='127.0.0.1:8091',
83                          help='Node address', metavar='127.0.0.1:8091')
84        parser.add_option('-s', dest='ram_quota', default=100, type='int',
85                          help='RAM quota in MB', metavar=100)
86
87        self.options, self.args = parser.parse_args()
88
89        if not self.args or not self.options.bucket:
90            parser.print_help()
91            sys.exit()
92
93        print self.options, self.args
94
95    def init_bucket(self):
96        server_info = {'ip': self.options.node.split(':')[0],
97                       'port': self.options.node.split(':')[1],
98                       'username': self.options.username,
99                       'password': self.options.password}
100
101        self.rest = RestConnection(server_info)
102        uri = "http://%s:%s/nodes/self" % (server_info["ip"], server_info["port"])
103        status, content = self.rest._http_request(uri)
104        quotaUnused = -1
105        if status:
106            try:
107                json_parsed = json.loads(content)
108                quotaTotal = json_parsed["storageTotals"]["ram"]["quotaTotal"]
109                quotaUnused = quotaTotal - json_parsed["storageTotals"]["ram"]["quotaUsed"]
110            except:
111                pass
112        quotaUnused = quotaUnused / 1024.0
113        if quotaUnused > 0 and quotaUnused < self.options.ram_quota:
114            print "RAM quota specified is too large to be provisioned into this cluster"
115            print "Available RAM quota: %d, requested: %d" % (quotaUnused, self.options.ram_quota)
116            sys.exit()
117
118        if not RestHelper(self.rest).bucket_exists(self.options.bucket):
119            self.rest.create_bucket(bucket=self.options.bucket,
120                                    ramQuotaMB=self.options.ram_quota,
121                                    authType='sasl')
122            time.sleep(10)
123
124    def save_doc(self, dockey, datafile):
125        raw_data = datafile.read()
126        try:
127            doc = json.loads(raw_data)
128            if '_id' not in doc:
129                self.bucket.set(dockey, 0, 0, raw_data)
130            else:
131                doc['_id'] = doc['_id'].encode('UTF-8')
132                self.bucket.save(doc)
133                for view in doc.get('views', []):
134                    self.views.append(doc['_id'] + '/_view/' + view)
135        except ValueError, error:
136            print error
137
138    def gen_dockey(self, filename):
139        return os.path.splitext(os.path.basename(filename))[0]
140
141    def enumerate_and_save(self, subdir=None):
142        if not subdir:
143            subdir = self.args[0]
144        subdirlist = list()
145        viewdirs = list()
146        for item in os.listdir(subdir):
147            if os.path.isfile(os.path.join(subdir, item)):
148                try:
149                    fp = open(os.path.join(subdir, item), 'r')
150                    dockey = self.gen_dockey(item)
151                    self.save_doc(dockey, fp)
152                    fp.close()
153                except IOError, error:
154                    print error
155            else:
156                if item.find("design_docs") > 0:
157                    viewdirs.append(os.path.join(subdir, item))
158                else:
159                    subdirlist.append(os.path.join(subdir, item))
160        for dir in subdirlist:
161            self.enumerate_and_save(dir)
162        for dir in viewdirs:
163            self.enumerate_and_save(dir)
164
165    def unzip_file_and_upload(self):
166        zfobj = zipfile.ZipFile(self.args[0])
167        tmpdir = tempfile.gettempdir()
168        working_dir = os.path.join(tmpdir, '_cbdocloader_working')
169        try:
170            shutil.rmtree(working_dir)
171        except:
172            pass
173        os.makedirs(working_dir)
174
175        ZipUtil(zfobj).extractall(working_dir)
176
177        self.enumerate_and_save(working_dir)
178        shutil.rmtree(working_dir)
179
180    def populate_docs(self):
181        cb = client.Couchbase(self.options.node,
182                              self.options.username,
183                              self.options.password)
184
185        self.bucket = cb[self.options.bucket]
186
187        #Retrieve and reset couchbase_api_base from server
188        self.bucket.server.couch_api_base = self.retrive_couch_api_base(cb)
189
190        self.views = list()
191
192        if self.args[0].endswith('.zip'):
193            self.unzip_file_and_upload()
194        else:
195            self.enumerate_and_save()
196
197    def retrive_couch_api_base(self, cb):
198        if (':' in self.options.node):
199            ip, port = self.options.node.split(':')
200        else:
201            ip, port = self.options.node, 8091
202
203        server_config_uri = "http://%s:%s/pools/default/buckets/%s" % (ip, port, self.options.bucket)
204        config = client.ServerHelper.parse_server_config(server_config_uri,
205                                                         self.options.username,
206                                                         self.options.password)
207        couch_api_base = config["nodes"][0].get("couchApiBase")
208
209        #Remove bucket suffix because it is added when saving design docs
210        couch_api_base = "/".join(couch_api_base.split("/")[:-1]) + "/"
211
212        return couch_api_base
213
214    def verify_queries(self):
215        for view in self.views:
216            print "View:", view
217            self.bucket.view(view, stale="update_after")
218
219def main():
220    if os.name == 'nt':
221        mydir = os.path.dirname(sys.argv[0])
222        bin_dir = os.path.join(mydir, '..')
223        path = [mydir, bin_dir, os.environ['PATH']]
224        os.environ['PATH'] = ';'.join(path)
225
226    docloader = DocLoader()
227
228    # parse options and arguments
229    docloader.parse_args()
230
231    # create new bucket if it doesn't exist
232    docloader.init_bucket()
233
234    # upload documents
235    docloader.populate_docs()
236
237    # execute views at least once
238    docloader.verify_queries()
239
240if __name__ == '__main__':
241    main()
242    os._exit(0)
243