1#!/usr/bin/env python
2
3import logging
4import os
5import simplejson as json
6import struct
7import sys
8import shutil
9import tempfile
10import zipfile
11
12import couchbaseConstants
13import pump
14
15JSON_SCHEME = "json://"
16
17class ZipUtil:
18    def __init__(self, zipobj):
19        self.zipobj = zipobj
20
21    def extractall(self, path=None):
22        if path is None:
23            path = os.getcwd()
24        if (path[-1] in (os.path.sep, os.path.altsep)
25            and len(os.path.splitdrive(path)[1]) > 1):
26            path = path[:-1]
27
28        for member in self.zipobj.namelist():
29            if not isinstance(member, zipfile.ZipInfo):
30                member = self.zipobj.getinfo(member)
31
32            # don't include leading "/" from file name if present
33            if member.filename[0] == '/':
34                targetpath = os.path.join(path, member.filename[1:])
35            else:
36                targetpath = os.path.join(path, member.filename)
37
38            targetpath = os.path.normpath(targetpath)
39
40            # Create all parent directories if necessary.
41            upperdirs = os.path.dirname(targetpath)
42            if upperdirs and not os.path.exists(upperdirs):
43                try:
44                    os.makedirs(upperdirs)
45                except:
46                    logging.error("Unexpected error:" + sys.exc_info()[0])
47                    return upperdirs
48
49            if member.filename[-1] == '/':
50                if not os.path.isdir(targetpath):
51                    try:
52                        os.mkdir(targetpath)
53                    except:
54                        logging.error("Fail to create directory:" + targetpath)
55                continue
56
57            target = file(targetpath, "wb")
58            target.write(self.zipobj.read(member.filename))
59            target.close()
60
61        return path
62
63class JSONSource(pump.Source):
64    """Reads json file or directory or zip file that contains json files."""
65
66    def __init__(self, opts, spec, source_bucket, source_node,
67                 source_map, sink_map, ctl, cur):
68        super(JSONSource, self).__init__(opts, spec, source_bucket, source_node,
69                                         source_map, sink_map, ctl, cur)
70        self.done = False
71        self.f = None
72        self.views = list()
73
74    @staticmethod
75    def can_handle(opts, spec):
76        return spec.startswith(JSON_SCHEME) and \
77            (os.path.isfile(spec.replace(JSON_SCHEME, "")) or \
78             os.path.isdir(spec.replace(JSON_SCHEME, "")) or \
79             spec.endswith(".zip"))
80
81    @staticmethod
82    def check(opts, spec):
83        return 0, {'spec': spec,
84                   'buckets': [{'name': os.path.basename(spec),
85                                'nodes': [{'hostname': 'N/A'}]}]}
86    @staticmethod
87    def save_doc(batch, dockey, datafile, is_data):
88        cmd = couchbaseConstants.CMD_TAP_MUTATION
89        vbucket_id = 0x0000ffff
90        cas, exp, flg = 0, 0, 0
91        try:
92            raw_data = datafile.read()
93            doc = json.loads(raw_data)
94            if '_id' not in doc:
95                if is_data:
96                    msg = (cmd, vbucket_id, dockey, flg, exp, cas, '', raw_data, 0, 0, 0)
97                    batch.append(msg, len(raw_data))
98            else:
99                id = doc['_id'].encode('UTF-8')
100                del doc['_id']
101                docdata = {"doc":{
102                    "json": doc,
103                    "meta":{"id":id}
104                }}
105                if not is_data:
106                    batch.append(json.dumps(docdata), len(docdata))
107        except ValueError, error:
108            logging.error("Fail to read json file with error:" + str(error))
109
110    @staticmethod
111    def gen_dockey(filename):
112        return os.path.splitext(os.path.basename(filename))[0]
113
114    @staticmethod
115    def enumerate_and_save(batch, subdir, is_data):
116        if not subdir:
117            return
118        subdirlist = list()
119        viewdirs = list()
120        for item in os.listdir(subdir):
121            if os.path.isfile(os.path.join(subdir, item)):
122                try:
123                    fp = open(os.path.join(subdir, item), 'r')
124                    dockey = JSONSource.gen_dockey(item)
125                    JSONSource.save_doc(batch, dockey, fp, is_data)
126                    fp.close()
127                except IOError, error:
128                    logging.error("Fail to load json file with error" + str(error))
129            else:
130                if item.find("design_docs") > 0:
131                    viewdirs.append(os.path.join(subdir, item))
132                else:
133                    subdirlist.append(os.path.join(subdir, item))
134        for dir in subdirlist:
135            JSONSource.enumerate_and_save(batch, dir, is_data)
136        for dir in viewdirs:
137            JSONSource.enumerate_and_save(batch, dir, is_data)
138
139    @staticmethod
140    def provide_design(opts, source_spec, source_bucket, source_map):
141        return 0, None
142
143    def provide_batch(self):
144        if self.done:
145            return 0, None
146
147        if not self.f:
148            self.f = self.spec.replace(JSON_SCHEME, "")
149        else:
150            return 0, None
151
152        batch = pump.Batch(self)
153        if self.f:
154            if os.path.isfile(self.f) and self.f.endswith(".zip"):
155                zfobj = zipfile.ZipFile(self.f)
156                working_dir = tempfile.mkdtemp()
157                ZipUtil(zfobj).extractall(working_dir)
158                JSONSource.enumerate_and_save(batch, working_dir, True)
159                shutil.rmtree(working_dir)
160            elif os.path.isdir(self.f):
161                JSONSource.enumerate_and_save(batch, self.f, True)
162            else:
163                try:
164                    fp = open(self.f, 'r')
165                    dockey = JSONSource.gen_dockey(os.path.basename(self.f))
166                    JSONSource.save_doc(batch, dockey, fp)
167                    fp.close()
168                except IOError, error:
169                    return "error: could not open json: %s; exception: %s" % \
170                        (self.f, e), None
171
172        if batch.size() <= 0:
173            return 0, None
174        return 0, batch
175