1#!/usr/bin/env python
2# -*- python -*-
3
4import pump
5import pump_transfer
6import pump_json
7
8import sys
9import time
10import zipfile
11import os
12import os.path
13import shutil
14import simplejson as json
15import tempfile
16
17from optparse import OptionParser
18
19from couchbase import client
20from couchbase.rest_client import RestConnection, RestHelper
21
22class ZipUtil:
23    def __init__(self, zipobj):
24        self.zipobj = zipobj
25
26    def extractall(self, path=None):
27        if path is None:
28            path = os.getcwd()
29        if (path[-1] in (os.path.sep, os.path.altsep)
30            and len(os.path.splitdrive(path)[1]) > 1):
31            path = path[:-1]
32
33        for member in self.zipobj.namelist():
34            if not isinstance(member, zipfile.ZipInfo):
35                member = self.zipobj.getinfo(member)
36
37            # don't include leading "/" from file name if present
38            if member.filename[0] == '/':
39                targetpath = os.path.join(path, member.filename[1:])
40            else:
41                targetpath = os.path.join(path, member.filename)
42
43            targetpath = os.path.normpath(targetpath)
44
45            # Create all parent directories if necessary.
46            upperdirs = os.path.dirname(targetpath)
47            if upperdirs and not os.path.exists(upperdirs):
48                try:
49                    os.makedirs(upperdirs)
50                except:
51                    print "Unexpected error:", sys.exc_info()[0]
52                    return upperdirs
53
54            if member.filename[-1] == '/':
55                if not os.path.isdir(targetpath):
56                    try:
57                        os.mkdir(targetpath)
58                    except:
59                        print "Fail to create directory:", targetpath
60                continue
61
62            target = file(targetpath, "wb")
63            target.write(self.zipobj.read(member.filename))
64            target.close()
65
66        return path
67
68class DocLoader(pump_transfer.Transfer):
69
70    def parse_args(self, argv):
71        usage = "usage: %prog [options] <directory>|zipfile\n\n" + \
72                "Example: %prog -u Administrator -p password -n 127.0.0.1:8091 " + \
73                "-b mybucket -s 100 gamesim-sample.zip"
74
75        parser = OptionParser(usage)
76
77        username = os.environ.get('BUCKET_USERNAME', "")
78        password = os.environ.get('BUCKET_PASSWORD', "")
79
80        parser.add_option('-u', dest='username', default=username,
81                          help='Username', metavar='Administrator')
82        parser.add_option('-p', dest='password', default=password,
83                          help='Password', metavar='password')
84        parser.add_option('-b', dest='bucket',
85                          help='Bucket', metavar='mybucket')
86        parser.add_option('-n', dest='node', default='127.0.0.1:8091',
87                          help='Node address', metavar='127.0.0.1:8091')
88        parser.add_option('-s', dest='ram_quota', default=100, type='int',
89                          help='RAM quota in MB', metavar=100)
90
91        self.options, self.args = parser.parse_args(argv[1:])
92        if not self.args or not self.options.bucket:
93            parser.print_help()
94            sys.exit(1)
95
96        # check if the uploaded file exists
97        if not os.path.exists(self.args[0]):
98            sys.stderr.write("Invalid path: %s\n" % self.args[0])
99            sys.exit(1)
100
101    def opt_construct(self, argv):
102        sink_opts = {"node" : "http://"}
103        common_opts = {"bucket" : ["-B", None],
104                       "username" : ["-u", None],
105                       "password" : ["-p", None],
106                      }
107        count_opts = {"verbose" : ["-v", None]}
108
109        # parse options and arguments
110        self.parse_args(argv)
111
112        gen_str = "json://" + self.args[0]
113        sink_str = ""
114        for key in sink_opts.iterkeys():
115            val = getattr(self.options, key, None)
116            if val:
117                sink_str += sink_opts[key] + val
118
119        for key in common_opts.iterkeys():
120            val = getattr(self.options, key, None)
121            if val:
122                common_opts[key][1] = str(val)
123
124        for key in count_opts.iterkeys():
125            val = getattr(self.options, key, None)
126            if val:
127                count_opts[key][1] = int(val)
128
129        return gen_str, sink_str, common_opts, count_opts
130
131    def init_bucket(self):
132        server_info = {'ip': self.options.node.split(':')[0],
133                       'port': self.options.node.split(':')[1],
134                       'username': self.options.username,
135                       'password': self.options.password}
136
137        self.rest = RestConnection(server_info)
138        timeout_in_seconds = 120
139        if self.options.password:
140            uri = "http://%s:%s/nodes/self" % (server_info["ip"], server_info["port"])
141            status, content = self.rest._http_request(uri)
142            quotaUnused = -1
143            if status:
144                try:
145                    json_parsed = json.loads(content)
146                    quotaTotal = json_parsed["storageTotals"]["ram"]["quotaTotal"]
147                    quotaUnused = quotaTotal - json_parsed["storageTotals"]["ram"]["quotaUsed"]
148                except:
149                    pass
150            quotaUnused = quotaUnused / 1024.0
151            if quotaUnused > 0 and quotaUnused < self.options.ram_quota:
152                sys.stderr.write("RAM quota specified is too large to be provisioned into this cluster\n")
153                sys.stderr.write("Available RAM quota: %d, requested: %d\n" %\
154                    (quotaUnused, self.options.ram_quota))
155                sys.exit(1)
156            if not RestHelper(self.rest).bucket_exists(self.options.bucket):
157                self.rest.create_bucket(bucket=self.options.bucket,
158                                        ramQuotaMB=self.options.ram_quota,
159                                        authType='sasl')
160
161                start = time.time()
162                # Make sure the bucket exists before querying its status
163                bucket_exist = False
164                while (time.time() - start) <= timeout_in_seconds and not bucket_exist:
165                    bucket_exist = RestHelper(self.rest).bucket_exists(self.options.bucket)
166                    if bucket_exist:
167                        break
168                    else:
169                        sys.stderr.write(".")
170                        time.sleep(2)
171
172                if not bucket_exist:
173                    sys.stderr.write("Fail to create bucket '%s' within %s seconds\n" %\
174                          (self.options.bucket, timeout_in_seconds))
175                    sys.exit(1)
176
177        self.rest = RestConnection(server_info)
178        #Query status for all bucket nodes
179        uri = "http://%s:%s/pools/default/buckets/%s" % \
180            (server_info["ip"], server_info["port"], self.options.bucket)
181        all_node_ready = False
182        start = time.time()
183        while (time.time() - start) <= timeout_in_seconds and not all_node_ready:
184            status, content = self.rest._http_request(uri)
185            try:
186                json_data = json.loads(content)
187                all_node_ready = True
188                for node in json_data["nodes"]:
189                    if node["status"] != "healthy":
190                        all_node_ready = False
191                        break
192                if not all_node_ready:
193                    sys.stderr.write(".")
194                    time.sleep(2)
195            except Exception, err:
196                print "Exception:", err
197                break
198        if not all_node_ready:
199            sys.stderr.write("\nNode status is not ready after creating bucket '%s' within %s seconds" %\
200                  (self.options.bucket, timeout_in_seconds))
201            sys.exit(1)
202        else:
203            print "bucket creation is successful"
204
205    def save_doc(self, dockey, datafile):
206        raw_data = datafile.read()
207        try:
208            doc = json.loads(raw_data)
209            if '_id' in doc:
210                doc['_id'] = doc['_id'].encode('UTF-8')
211                self.bucket.save(doc)
212                for view in doc.get('views', []):
213                    self.views.append(doc['_id'] + '/_view/' + view)
214        except ValueError, error:
215            print error
216
217    def gen_dockey(self, filename):
218        return os.path.splitext(os.path.basename(filename))[0]
219
220    def enumerate_and_save(self, subdir=None):
221        if not subdir:
222            subdir = self.args[0]
223        subdirlist = list()
224        viewdirs = list()
225        for item in os.listdir(subdir):
226            if os.path.isfile(os.path.join(subdir, item)):
227                try:
228                    fp = open(os.path.join(subdir, item), 'r')
229                    dockey = self.gen_dockey(item)
230                    self.save_doc(dockey, fp)
231                    fp.close()
232                except IOError, error:
233                    print error
234            else:
235                if item.find("design_docs") > 0:
236                    viewdirs.append(os.path.join(subdir, item))
237                else:
238                    subdirlist.append(os.path.join(subdir, item))
239        for dir in subdirlist:
240            self.enumerate_and_save(dir)
241        for dir in viewdirs:
242            self.enumerate_and_save(dir)
243
244    def unzip_file_and_upload(self):
245        zfobj = zipfile.ZipFile(self.args[0])
246
247        working_dir = tempfile.mkdtemp()
248        ZipUtil(zfobj).extractall(working_dir)
249
250        self.enumerate_and_save(working_dir)
251        shutil.rmtree(working_dir)
252
253    def populate_docs(self):
254        cb = client.Couchbase(self.options.node,
255                              self.options.username,
256                              self.options.password)
257
258        self.bucket = cb[self.options.bucket]
259
260        #Retrieve and reset couchbase_api_base from server
261        self.bucket.server.couch_api_base = self.retrive_couch_api_base(cb)
262
263        self.views = list()
264
265        if self.args[0].endswith('.zip'):
266            self.unzip_file_and_upload()
267        else:
268            self.enumerate_and_save()
269
270    def retrive_couch_api_base(self, cb):
271        if (':' in self.options.node):
272            ip, port = self.options.node.split(':')
273        else:
274            ip, port = self.options.node, 8091
275
276        server_config_uri = "http://%s:%s/pools/default/buckets/%s" % (ip, port, self.options.bucket)
277        config = client.ServerHelper.parse_server_config(server_config_uri,
278                                                         self.options.username,
279                                                         self.options.password)
280        couch_api_base = config["nodes"][0].get("couchApiBase")
281
282        #Remove bucket suffix because it is added when saving design docs
283        couch_api_base = "/".join(couch_api_base.split("/")[:-1]) + "/"
284
285        return couch_api_base
286
287    def verify_queries(self):
288        for view in self.views:
289            self.bucket.view(view, stale="update_after")
290
291    def find_handlers(self, opts, source, sink):
292        return pump_json.JSONSource, pump.PumpingStation.find_handler(opts, sink, pump_transfer.SINKS)
293
294    def main(self, argv):
295
296        src, sink, common_opts, count_opts = self.opt_construct(argv)
297        local_args = [argv[0]]
298        local_args.append(src)
299        local_args.append(sink)
300        for v in common_opts.itervalues():
301            local_args.append(v[0])
302            local_args.append(v[1])
303
304        for v in count_opts.itervalues():
305            if v[1] is not None:
306                for i in range(v[1]):
307                    local_args.append(v[0])
308
309        # create new bucket if it doesn't exist
310        self.init_bucket()
311
312        #use cbtransfer to upload documents
313        pump_transfer.Transfer.main(self, local_args)
314
315        #upload documents
316        self.populate_docs()
317
318        # execute views at least once
319        self.verify_queries()
320
321if __name__ == '__main__':
322    if os.name == 'nt':
323        mydir = os.path.dirname(sys.argv[0])
324        bin_dir = os.path.join(mydir, '..')
325        path = [mydir, bin_dir, os.environ['PATH']]
326        os.environ['PATH'] = ';'.join(path)
327
328    pump_transfer.exit_handler(DocLoader().main(sys.argv))
329