1#!/usr/bin/env python
2
3import csv
4import logging
5import os
6import simplejson as json
7import sys
8import struct
9import urllib
10
11import couchbaseConstants
12import pump
13import cbsnappy as snappy
14
15def number_try_parse(s):
16    for func in (int, float):
17        try:
18            v = func(s)
19            if s == str(v):
20                return v
21        except ValueError:
22            pass
23    return s
24
25class CSVSource(pump.Source):
26    """Reads csv file, where first line is field names and one field
27       should be 'id'."""
28
29    def __init__(self, opts, spec, source_bucket, source_node,
30                 source_map, sink_map, ctl, cur):
31        super(CSVSource, self).__init__(opts, spec, source_bucket, source_node,
32                                        source_map, sink_map, ctl, cur)
33        self.done = False
34        self.r = None # An iterator of csv.reader()
35
36    @staticmethod
37    def can_handle(opts, spec):
38        return spec.endswith(".csv") and os.path.isfile(spec)
39
40    @staticmethod
41    def check(opts, spec):
42        return 0, {'spec': spec,
43                   'buckets': [{'name': os.path.basename(spec),
44                                'nodes': [{'hostname': 'N/A'}]}]}
45
46    @staticmethod
47    def provide_design(opts, source_spec, source_bucket, source_map):
48        return 0, None
49
50    def provide_batch(self):
51        if self.done:
52            return 0, None
53
54        if not self.r:
55            try:
56                self.r = csv.reader(open(self.spec, 'rU'))
57                self.fields = self.r.next()
58                if not 'id' in self.fields:
59                    return ("error: no 'id' field in 1st line of csv: %s" %
60                            (self.spec)), None
61            except StopIteration:
62                return ("error: could not read 1st line of csv: %s" %
63                        (self.spec)), None
64            except IOError, e:
65                return ("error: could not open csv: %s; exception: %s" %
66                        (self.spec, e)), None
67
68        batch = pump.Batch(self)
69
70        batch_max_size = self.opts.extra['batch_max_size']
71        batch_max_bytes = self.opts.extra['batch_max_bytes']
72
73        cmd = couchbaseConstants.CMD_TAP_MUTATION
74        vbucket_id = 0x0000ffff
75        cas, exp, flg = 0, 0, 0
76
77        while (self.r and
78               batch.size() < batch_max_size and
79               batch.bytes < batch_max_bytes):
80            try:
81                vals = self.r.next()
82                doc = {}
83                for i, field in enumerate(self.fields):
84                    if i >= len(vals):
85                        continue
86                    if field == 'id':
87                        doc[field] = vals[i]
88                    else:
89                        doc[field] = number_try_parse(vals[i])
90                if doc['id']:
91                    doc_json = json.dumps(doc)
92                    msg = (cmd, vbucket_id, doc['id'], flg, exp, cas, '', doc_json, 0, 0, 0)
93                    batch.append(msg, len(doc))
94            except StopIteration:
95                self.done = True
96                self.r = None
97            except Exception, e:
98                logging.error("error: fails to read from csv file, %s", e)
99                continue
100
101        if batch.size() <= 0:
102            return 0, None
103        return 0, batch
104
105
106class CSVSink(pump.Sink):
107    """Emits batches to stdout in CSV format."""
108    CSV_SCHEME = "csv:"
109    CSV_JSON_SCHEME = "csv-json:"
110
111    def __init__(self, opts, spec, source_bucket, source_node,
112                 source_map, sink_map, ctl, cur):
113        super(CSVSink, self).__init__(opts, spec, source_bucket, source_node,
114                                      source_map, sink_map, ctl, cur)
115        self.writer = None
116        self.fields = None
117
118    def bucket_name(self):
119        if 'name' in self.source_bucket:
120            return self.source_bucket['name']
121        else:
122            return ""
123
124    def node_name(self):
125        if 'hostname' in self.source_node:
126            return self.source_node['hostname']
127        else:
128            return ""
129
130    @staticmethod
131    def can_handle(opts, spec):
132        if spec.startswith(CSVSink.CSV_SCHEME) or spec.startswith(CSVSink.CSV_JSON_SCHEME):
133            opts.threads = 1 # Force 1 thread to not overlap stdout.
134            return True
135        return False
136
137    @staticmethod
138    def check(opts, spec, source_map):
139        rv = 0
140        if spec.endswith(".csv"):
141            if spec.startswith(CSVSink.CSV_JSON_SCHEME):
142                targetpath = spec[len(CSVSink.CSV_JSON_SCHEME):]
143            else:
144                targetpath = spec[len(CSVSink.CSV_SCHEME):]
145            targetpath = os.path.normpath(targetpath)
146            rv = pump.mkdirs(targetpath)
147
148        return rv, None
149
150    @staticmethod
151    def consume_design(opts, sink_spec, sink_map,
152                       source_bucket, source_map, source_design):
153        if source_design:
154            logging.warn("warning: cannot save bucket design"
155                         " on a CSV destination")
156        return 0
157
158    def consume_batch_async(self, batch):
159        if not self.writer:
160            csvfile = sys.stdout
161            if self.spec.startswith(CSVSink.CSV_JSON_SCHEME):
162                if len(batch.msgs) <= 0:
163                    future = pump.SinkBatchFuture(self, batch)
164                    self.future_done(future, 0)
165                    return 0, future
166
167                cmd, vbucket_id, key, flg, exp, cas, meta, val = batch.msgs[0][:8]
168                doc = json.loads(val)
169                self.fields = sorted(doc.keys())
170                if 'id' not in self.fields:
171                    self.fields = ['id'] + self.fields
172                if self.spec.endswith(".csv"):
173                    filename = self.get_csvfile(self.spec[len(CSVSink.CSV_JSON_SCHEME):])
174                    try:
175                        csvfile = open(filename, "wb")
176                    except IOError, e:
177                        return ("error: could not write csv to file:%s" % filename), None
178                self.writer = csv.writer(csvfile)
179                self.writer.writerow(self.fields)
180            else:
181                if self.spec.endswith(".csv"):
182                    filename = self.get_csvfile(self.spec[len(CSVSink.CSV_SCHEME):])
183                    try:
184                        csvfile = open(filename, "wb")
185                    except IOError, e:
186                        return ("error: could not write csv to file:%s" % \
187                               filename), None
188                self.writer = csv.writer(csvfile)
189                self.writer.writerow(['id', 'flags', 'expiration', 'cas', 'value', 'rev', 'vbid', 'dtype'])
190        msg_tuple_format = 0
191        for msg in batch.msgs:
192            cmd, vbucket_id, key, flg, exp, cas, meta, val = msg[:8]
193            if self.skip(key, vbucket_id):
194                continue
195            if not msg_tuple_format:
196                msg_tuple_format = len(msg)
197            seqno = dtype = nmeta = 0
198            if msg_tuple_format > 8:
199                seqno, dtype, nmeta = msg[8:]
200            if dtype > 2:
201                try:
202                    val = snappy.uncompress(val)
203                except Exception, err:
204                    pass
205            try:
206                if cmd in [couchbaseConstants.CMD_TAP_MUTATION,
207                           couchbaseConstants.CMD_DCP_MUTATION]:
208                    if self.fields:
209                        if val and len(val) > 0:
210                            try:
211                                row = []
212                                doc = json.loads(val)
213                                if type(doc) == dict:
214                                    for field in self.fields:
215                                        if field == 'id':
216                                            row.append(key)
217                                        else:
218                                            row.append(doc[field])
219                                    self.writer.writerow(row)
220                            except ValueError:
221                                pass
222                    else:
223                        #rev = self.convert_meta(meta)
224                        self.writer.writerow([key, flg, exp, cas, val, meta, vbucket_id, dtype])
225                elif cmd in [couchbaseConstants.CMD_TAP_DELETE, couchbaseConstants.CMD_DCP_DELETE]:
226                    pass
227                elif cmd == couchbaseConstants.CMD_GET:
228                    pass
229                else:
230                    return "error: CSVSink - unknown cmd: " + str(cmd), None
231            except IOError:
232                return "error: could not write csv to stdout", None
233
234        future = pump.SinkBatchFuture(self, batch)
235        self.future_done(future, 0)
236        return 0, future
237
238    def get_csvfile(self, base):
239        extension = os.path.splitext(base)
240        filename = extension[0]
241        if self.bucket_name():
242            filename = filename + "_" + urllib.quote_plus(self.bucket_name())
243        if self.node_name():
244            filename = filename + "_" + urllib.quote_plus(self.node_name())
245        return filename + extension[1]
246
247    def convert_meta(self, meta):
248        seq_no = str(meta)
249        if len(seq_no) > 8:
250            seq_no = seq_no[0:8]
251        if len(seq_no) < 8:
252            seq_no = ('\x00\x00\x00\x00\x00\x00\x00\x00' + seq_no)[-8:]
253        check_seqno, = struct.unpack(">Q", seq_no)
254        if not check_seqno:
255            check_seqno = 1
256
257        return check_seqno