1#!/usr/bin/env python
2# -*- python -*-
3
4import pump
5import pump_transfer
6import pump_json
7import util_cli as util
8
9import sys
10import time
11import zipfile
12import os
13import os.path
14import shutil
15import simplejson as json
16import tempfile
17
18from optparse import OptionParser
19
20from couchbase import client
21from couchbase.rest_client import RestConnection, RestHelper
22
23class ZipUtil:
24    def __init__(self, zipobj):
25        self.zipobj = zipobj
26
27    def extractall(self, path=None):
28        if path is None:
29            path = os.getcwd()
30        if (path[-1] in (os.path.sep, os.path.altsep)
31            and len(os.path.splitdrive(path)[1]) > 1):
32            path = path[:-1]
33
34        for member in self.zipobj.namelist():
35            if not isinstance(member, zipfile.ZipInfo):
36                member = self.zipobj.getinfo(member)
37
38            # don't include leading "/" from file name if present
39            if member.filename[0] == '/':
40                targetpath = os.path.join(path, member.filename[1:])
41            else:
42                targetpath = os.path.join(path, member.filename)
43
44            targetpath = os.path.normpath(targetpath)
45
46            # Create all parent directories if necessary.
47            upperdirs = os.path.dirname(targetpath)
48            if upperdirs and not os.path.exists(upperdirs):
49                try:
50                    os.makedirs(upperdirs)
51                except:
52                    print "Unexpected error:", sys.exc_info()[0]
53                    return upperdirs
54
55            if member.filename[-1] == '/':
56                if not os.path.isdir(targetpath):
57                    try:
58                        os.mkdir(targetpath)
59                    except:
60                        print "Fail to create directory:", targetpath
61                continue
62
63            target = file(targetpath, "wb")
64            target.write(self.zipobj.read(member.filename))
65            target.close()
66
67        return path
68
69class DocLoader(pump_transfer.Transfer):
70
71    def parse_args(self, argv):
72        usage = "usage: %prog [options] <directory>|zipfile\n\n" + \
73                "Example: %prog -u Administrator -p password -n 127.0.0.1:8091 " + \
74                "-b mybucket -s 100 gamesim-sample.zip"
75
76        parser = OptionParser(usage)
77
78        username = os.environ.get('BUCKET_USERNAME', "")
79        password = os.environ.get('BUCKET_PASSWORD', "")
80
81        parser.add_option('-u', dest='username', default=username,
82                          help='Username', metavar='Administrator')
83        parser.add_option('-p', dest='password', default=password,
84                          help='Password', metavar='password')
85        parser.add_option('-b', dest='bucket',
86                          help='Bucket', metavar='mybucket')
87        parser.add_option('-n', dest='node', default='127.0.0.1:8091',
88                          help='Node address', metavar='127.0.0.1:8091')
89        parser.add_option('-s', dest='ram_quota', default=100, type='int',
90                          help='RAM quota in MB', metavar=100)
91
92        self.options, self.args = parser.parse_args(argv[1:])
93        if not self.args or not self.options.bucket:
94            parser.print_help()
95            sys.exit(1)
96
97        # check if the uploaded file exists
98        if not os.path.exists(self.args[0]):
99            sys.stderr.write("Invalid path: %s\n" % self.args[0])
100            sys.exit(1)
101
102    def opt_construct(self, argv):
103        sink_opts = {"node" : "http://"}
104        common_opts = {"bucket" : ["-B", None],
105                       "username" : ["-u", None],
106                       "password" : ["-p", None],
107                      }
108        count_opts = {"verbose" : ["-v", None]}
109
110        # parse options and arguments
111        self.parse_args(argv)
112
113        gen_str = "json://" + self.args[0]
114        sink_str = ""
115        for key in sink_opts.iterkeys():
116            val = getattr(self.options, key, None)
117            if val:
118                sink_str += sink_opts[key] + val
119
120        for key in common_opts.iterkeys():
121            val = getattr(self.options, key, None)
122            if val:
123                common_opts[key][1] = str(val)
124
125        for key in count_opts.iterkeys():
126            val = getattr(self.options, key, None)
127            if val:
128                count_opts[key][1] = int(val)
129
130        return gen_str, sink_str, common_opts, count_opts
131
132    def init_bucket(self):
133        host, port= util.hostport(self.options.node)
134        server_info = {'ip': host,
135                       'port': port,
136                       'username': self.options.username,
137                       'password': self.options.password}
138
139        self.rest = RestConnection(server_info)
140        timeout_in_seconds = 120
141        if self.options.password:
142            uri = "http://%s:%s/nodes/self" % (server_info["ip"], server_info["port"])
143            status, content = self.rest._http_request(uri)
144            quotaUnused = -1
145            if status:
146                try:
147                    json_parsed = json.loads(content)
148                    quotaTotal = json_parsed["storageTotals"]["ram"]["quotaTotal"]
149                    quotaUnused = quotaTotal - json_parsed["storageTotals"]["ram"]["quotaUsed"]
150                except:
151                    pass
152            quotaUnused = quotaUnused / 1024.0
153            if quotaUnused > 0 and quotaUnused < self.options.ram_quota:
154                sys.stderr.write("RAM quota specified is too large to be provisioned into this cluster\n")
155                sys.stderr.write("Available RAM quota: %d, requested: %d\n" %\
156                    (quotaUnused, self.options.ram_quota))
157                sys.exit(1)
158            if not RestHelper(self.rest).bucket_exists(self.options.bucket):
159                self.rest.create_bucket(bucket=self.options.bucket,
160                                        ramQuotaMB=self.options.ram_quota,
161                                        authType='sasl')
162
163                start = time.time()
164                # Make sure the bucket exists before querying its status
165                bucket_exist = False
166                while (time.time() - start) <= timeout_in_seconds and not bucket_exist:
167                    bucket_exist = RestHelper(self.rest).bucket_exists(self.options.bucket)
168                    if bucket_exist:
169                        break
170                    else:
171                        sys.stderr.write(".")
172                        time.sleep(2)
173
174                if not bucket_exist:
175                    sys.stderr.write("Fail to create bucket '%s' within %s seconds\n" %\
176                          (self.options.bucket, timeout_in_seconds))
177                    sys.exit(1)
178
179        self.rest = RestConnection(server_info)
180        #Query status for all bucket nodes
181        uri = "http://%s:%s/pools/default/buckets/%s" % \
182            (server_info["ip"], server_info["port"], self.options.bucket)
183        all_node_ready = False
184        start = time.time()
185        while (time.time() - start) <= timeout_in_seconds and not all_node_ready:
186            status, content = self.rest._http_request(uri)
187            try:
188                json_data = json.loads(content)
189                all_node_ready = True
190                for node in json_data["nodes"]:
191                    if node["status"] != "healthy":
192                        all_node_ready = False
193                        break
194                if not all_node_ready:
195                    sys.stderr.write(".")
196                    time.sleep(2)
197            except Exception, err:
198                print "Exception:", err
199                break
200        if not all_node_ready:
201            sys.stderr.write("\nNode status is not ready after creating bucket '%s' within %s seconds" %\
202                  (self.options.bucket, timeout_in_seconds))
203            sys.exit(1)
204        else:
205            print "bucket creation is successful"
206
207    def save_doc(self, dockey, datafile):
208        raw_data = datafile.read()
209        try:
210            doc = json.loads(raw_data)
211            if '_id' in doc:
212                doc['_id'] = doc['_id'].encode('UTF-8')
213                self.bucket.save(doc)
214                for view in doc.get('views', []):
215                    self.views.append(doc['_id'] + '/_view/' + view)
216        except ValueError, error:
217            print error
218
219    def gen_dockey(self, filename):
220        return os.path.splitext(os.path.basename(filename))[0]
221
222    def enumerate_and_save(self, subdir=None):
223        if not subdir:
224            subdir = self.args[0]
225        subdirlist = list()
226        viewdirs = list()
227        for item in os.listdir(subdir):
228            if os.path.isfile(os.path.join(subdir, item)):
229                try:
230                    fp = open(os.path.join(subdir, item), 'r')
231                    dockey = self.gen_dockey(item)
232                    self.save_doc(dockey, fp)
233                    fp.close()
234                except IOError, error:
235                    print error
236            else:
237                if item.find("design_docs") > 0:
238                    viewdirs.append(os.path.join(subdir, item))
239                else:
240                    subdirlist.append(os.path.join(subdir, item))
241        for dir in subdirlist:
242            self.enumerate_and_save(dir)
243        for dir in viewdirs:
244            self.enumerate_and_save(dir)
245
246    def unzip_file_and_upload(self):
247        zfobj = zipfile.ZipFile(self.args[0])
248
249        working_dir = tempfile.mkdtemp()
250        ZipUtil(zfobj).extractall(working_dir)
251
252        self.enumerate_and_save(working_dir)
253        shutil.rmtree(working_dir)
254
255    def populate_docs(self):
256        cb = client.Couchbase(self.options.node,
257                              self.options.username,
258                              self.options.password)
259
260        self.bucket = cb[self.options.bucket]
261
262        #Retrieve and reset couchbase_api_base from server
263        self.bucket.server.couch_api_base = self.retrive_couch_api_base(cb)
264
265        self.views = list()
266
267        if self.args[0].endswith('.zip'):
268            self.unzip_file_and_upload()
269        else:
270            self.enumerate_and_save()
271
272    def retrive_couch_api_base(self, cb):
273        if (':' in self.options.node):
274            ip, port = self.options.node.split(':')
275        else:
276            ip, port = self.options.node, 8091
277
278        server_config_uri = "http://%s:%s/pools/default/buckets/%s" % (ip, port, self.options.bucket)
279        config = client.ServerHelper.parse_server_config(server_config_uri,
280                                                         self.options.username,
281                                                         self.options.password)
282        couch_api_base = config["nodes"][0].get("couchApiBase")
283
284        #Remove bucket suffix because it is added when saving design docs
285        couch_api_base = "/".join(couch_api_base.split("/")[:-1]) + "/"
286
287        return couch_api_base
288
289    def verify_queries(self):
290        for view in self.views:
291            self.bucket.view(view, stale="update_after")
292
293    def find_handlers(self, opts, source, sink):
294        return pump_json.JSONSource, pump.PumpingStation.find_handler(opts, sink, pump_transfer.SINKS)
295
296    def main(self, argv):
297
298        src, sink, common_opts, count_opts = self.opt_construct(argv)
299        local_args = [argv[0]]
300        local_args.append(src)
301        local_args.append(sink)
302        for v in common_opts.itervalues():
303            local_args.append(v[0])
304            local_args.append(v[1])
305
306        for v in count_opts.itervalues():
307            if v[1] is not None:
308                for i in range(v[1]):
309                    local_args.append(v[0])
310
311        # create new bucket if it doesn't exist
312        self.init_bucket()
313
314        #use cbtransfer to upload documents
315        pump_transfer.Transfer.main(self, local_args)
316
317        #upload documents
318        self.populate_docs()
319
320        # execute views at least once
321        self.verify_queries()
322
323if __name__ == '__main__':
324    if os.name == 'nt':
325        mydir = os.path.dirname(sys.argv[0])
326        bin_dir = os.path.join(mydir, '..')
327        path = [mydir, bin_dir, os.environ['PATH']]
328        os.environ['PATH'] = ';'.join(path)
329
330    pump_transfer.exit_handler(DocLoader().main(sys.argv))
331