1#!/usr/bin/env python
2
3import glob
4import logging
5import os
6import re
7import simplejson as json
8import struct
9import threading
10
11import couchstore
12import couchbaseConstants
13import pump
14from cbcollections import defaultdict
15
16from cbqueue import PumpQueue
17
18SFD_SCHEME = "couchstore-files://"
19SFD_VBUCKETS = 1024
20SFD_REV_META = ">QIIBB" # cas, exp, flg, flex_meta, dtype
21SFD_REV_SEQ = ">Q"
22SFD_DB_SEQ = ">Q"
23SFD_RE = "^([0-9]+)\\.couch\\.([0-9]+)$"
24
25# TODO: (1) SFDSource - total_msgs.
26# TODO: (1) SFDSink - ensure right user for bucket_dir.
27# TODO: (1) SFDSink - ensure right user for couchstore file.
28
29class SFDSource(pump.Source):
30    """Reads couchstore files from a couchbase server data directory."""
31
32    def __init__(self, opts, spec, source_bucket, source_node,
33                 source_map, sink_map, ctl, cur):
34        super(SFDSource, self).__init__(opts, spec, source_bucket, source_node,
35                                        source_map, sink_map, ctl, cur)
36        self.done = False
37        self.queue = None
38
39    @staticmethod
40    def can_handle(opts, spec):
41        return spec.startswith(SFD_SCHEME)
42
43    @staticmethod
44    def check_base(opts, spec):
45        # Skip immediate superclass Source.check_base(),
46        # since SFDSource can handle different vbucket states.
47        return pump.EndPoint.check_base(opts, spec)
48
49    @staticmethod
50    def check(opts, spec):
51        rv, d = data_dir(spec)
52        if rv != 0:
53            return rv
54
55        buckets = []
56
57        for bucket_dir in sorted(glob.glob(d + "/*/")):
58            if not glob.glob(bucket_dir + "/*.couch.*"):
59                continue
60            bucket_name = os.path.basename(os.path.dirname(bucket_dir))
61            if not bucket_name:
62                return "error: bucket_name too short: " + bucket_dir, None
63            rv, v = SFDSource.vbucket_states(opts, spec, bucket_dir)
64            if rv != 0:
65                return rv, None
66            buckets.append({'name': bucket_name,
67                            'nodes': [{'hostname': 'N/A',
68                                       'vbucket_states': v}]})
69
70        if not buckets:
71            return "error: no bucket subdirectories at: " + d, None
72
73        return 0, {'spec': spec, 'buckets': buckets}
74
75    @staticmethod
76    def vbucket_states(opts, spec, bucket_dir):
77        """Reads all the latest couchstore files in a directory, and returns
78           map of state string (e.g., 'active') to map of vbucket_id to doc."""
79        vbucket_states = defaultdict(dict)
80
81        for f in latest_couch_files(bucket_dir):
82            vbucket_id = int(re.match(SFD_RE, os.path.basename(f)).group(1))
83            try:
84                store = couchstore.CouchStore(f, 'r')
85                try:
86                    doc_str = store.localDocs['_local/vbstate']
87                    if doc_str:
88                        doc = json.loads(doc_str)
89                        state = doc.get('state', None)
90                        if state:
91                            vbucket_states[state][vbucket_id] = doc
92                        else:
93                            return "error: missing vbucket_state from: %s" \
94                                % (f), None
95                except Exception, e:
96                    return ("error: could not read _local/vbstate from: %s" +
97                            "; exception: %s") % (f, e), None
98                store.close()
99            except Exception, e:
100                return ("error: could not read couchstore file: %s" +
101                        "; exception: %s") % (f, e), None
102
103        if vbucket_states:
104            return 0, vbucket_states
105        return "error: no vbucket_states in files: %s" % (bucket_dir), None
106
107    @staticmethod
108    def provide_design(opts, source_spec, source_bucket, source_map):
109        rv, d = data_dir(source_spec)
110        if rv != 0:
111            return rv, None
112
113        bucket_dir = d + '/' + source_bucket['name']
114        if not os.path.isdir(bucket_dir):
115            return 0, None
116
117        rv, store, store_path = \
118            open_latest_store(bucket_dir,
119                              "master.couch.*",
120                              "^(master)\\.couch\\.([0-9]+)$",
121                              "master.couch.0",
122                              mode='r')
123        if rv != 0 or not store:
124            return rv, None
125
126        rows = []
127
128        for doc_info in store.changesSince(0):
129            if not doc_info.deleted:
130                try:
131                    doc_contents = doc_info.getContents(options=couchstore.CouchStore.DECOMPRESS)
132                except Exception, e:
133                    return ("error: could not read design doc: %s" +
134                            "; source_spec: %s; exception: %s") % \
135                            (doc_info.id, source_spec, e), None
136                try:
137                    doc = json.loads(doc_contents)
138                except ValueError, e:
139                    return ("error: could not parse design doc: %s" +
140                            "; source_spec: %s; exception: %s") % \
141                            (doc_info.id, source_spec, e), None
142
143                doc['id'] = doc.get('id', doc_info.id)
144                doc['_rev'] = doc.get('_rev', doc_info.revSequence)
145
146                rows.append({'id': doc_info.id, 'doc': doc})
147
148        store.close()
149
150        return 0, json.dumps(rows)
151
152    def provide_batch(self):
153        if self.done:
154            return 0, None
155
156        if not self.queue:
157            name = "c" + threading.currentThread().getName()[1:]
158            self.queue = PumpQueue(2)
159            self.thread = threading.Thread(target=self.loader, name=name)
160            self.thread.daemon = True
161            self.thread.start()
162
163        rv, batch = self.queue.get()
164        self.queue.task_done()
165        if rv != 0 or batch is None:
166            self.done = True
167        return rv, batch
168
169    def loader(self):
170        rv, d = data_dir(self.spec)
171        if rv != 0:
172            self.queue.put((rv, None))
173            return
174
175        source_vbucket_state = \
176            getattr(self.opts, 'source_vbucket_state', 'active')
177
178        source_nodes = self.source_bucket['nodes']
179        if len(source_nodes) != 1:
180            self.queue.put(("error: expected 1 node in source_bucket: %s"
181                            % (self.source_bucket['name']), None))
182            return
183
184        vbucket_states = source_nodes[0].get('vbucket_states', None)
185        if not vbucket_states:
186            self.queue.put(("error: missing vbucket_states in source_bucket: %s"
187                            % (self.source_bucket['name']), None))
188            return
189
190        vbuckets = vbucket_states.get(source_vbucket_state, None)
191        if vbuckets is None: # Empty dict is valid.
192            self.queue.put(("error: missing vbuckets in source_bucket: %s"
193                            % (self.source_bucket['name']), None))
194            return
195
196        batch_max_size = self.opts.extra['batch_max_size']
197        batch_max_bytes = self.opts.extra['batch_max_bytes']
198
199        store = None
200        vbucket_id = None
201
202        # Level of indirection since we can't use python 3 nonlocal statement.
203        abatch = [pump.Batch(self)]
204
205        def change_callback(doc_info):
206            if doc_info:
207                key = doc_info.id
208                if self.skip(key, vbucket_id):
209                    return
210
211                if doc_info.deleted:
212                    cmd = couchbaseConstants.CMD_TAP_DELETE
213                    val = ''
214                else:
215                    cmd = couchbaseConstants.CMD_TAP_MUTATION
216                    val = doc_info.getContents(options=couchstore.CouchStore.DECOMPRESS)
217
218                cas, exp, flg, flex_meta, dtype = struct.unpack(SFD_REV_META, doc_info.revMeta)
219                meta = doc_info.revSequence
220                seqno = doc_info.sequence
221                nmeta = 0
222                msg = (cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta)
223                abatch[0].append(msg, len(val))
224
225            if (abatch[0].size() >= batch_max_size or
226                abatch[0].bytes >= batch_max_bytes):
227                self.queue.put((0, abatch[0]))
228                abatch[0] = pump.Batch(self)
229
230        for f in latest_couch_files(d + '/' + self.source_bucket['name']):
231            vbucket_id = int(re.match(SFD_RE, os.path.basename(f)).group(1))
232            if not vbucket_id in vbuckets:
233                continue
234
235            try:
236                store = couchstore.CouchStore(f, 'r')
237            except Exception, e:
238                self.queue.put(("error: could not open couchstore file: %s"
239                                "; exception: %s" % (f, e), None))
240                return
241
242            store.forEachChange(0, change_callback)
243            store.close()
244
245        if abatch[0].size():
246            self.queue.put((0, abatch[0]))
247        self.queue.put((0, None))
248
249
250class SFDSink(pump.Sink):
251    """Sink for couchstore in couchbase server/file/directory layout."""
252
253    def __init__(self, opts, spec, source_bucket, source_node,
254                 source_map, sink_map, ctl, cur):
255        super(SFDSink, self).__init__(opts, spec, source_bucket, source_node,
256                                      source_map, sink_map, ctl, cur)
257        self.rehash = opts.extra.get("rehash", 0)
258        self.init_worker(SFDSink.run)
259
260    @staticmethod
261    def run(self):
262        destination_vbucket_state = \
263            getattr(self.opts, 'destination_vbucket_state', 'active')
264
265        vbucket_states = self.source_node.get('vbucket_states', {})
266
267        while not self.ctl['stop']:
268            batch, future = self.pull_next_batch()
269            if not batch:
270                return self.future_done(future, 0)
271
272            vbuckets = batch.group_by_vbucket_id(SFD_VBUCKETS, self.rehash)
273            for vbucket_id, msgs in vbuckets.iteritems():
274                checkpoint_id = 0
275                max_deleted_seqno = 0
276
277                rv, store, store_path = self.open_store(vbucket_id)
278                if rv != 0:
279                    return self.future_done(future, rv)
280
281                bulk_keys = []
282                bulk_vals = []
283
284                for i, msg in enumerate(msgs):
285                    cmd, _vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta = msg
286                    if self.skip(key, vbucket_id):
287                        continue
288
289                    d = couchstore.DocumentInfo(str(key))
290                    flex_meta = 1
291                    d.revMeta = str(struct.pack(SFD_REV_META, cas, exp, flg, flex_meta, dtype))
292                    if meta:
293                        if len(meta) > 8:
294                            meta = meta[0:8]
295                        if len(meta) < 8:
296                            meta = ('\x00\x00\x00\x00\x00\x00\x00\x00' + meta)[-8:]
297                        d.revSequence, = struct.unpack(SFD_REV_SEQ, meta)
298                    else:
299                        d.revSequence = 1
300
301                    if seqno:
302                        d.sequence = int(seqno)
303                    if cmd == couchbaseConstants.CMD_TAP_MUTATION:
304                        v = str(val)
305                        try:
306                            if (re.match('^\\s*{', v) and
307                                json.loads(v) is not None):
308                                d.contentType = couchstore.DocumentInfo.IS_JSON
309                        except ValueError:
310                            pass # NON_JSON is already the default contentType.
311                    elif cmd == couchbaseConstants.CMD_TAP_DELETE:
312                        v = None
313                    else:
314                        self.future_done(future,
315                                         "error: SFDSink bad cmd: " + str(cmd))
316                        store.close()
317                        return
318
319                    bulk_keys.append(d)
320                    bulk_vals.append(v)
321
322                try:
323                    if bulk_keys and bulk_vals:
324                        vm = vbucket_states.get(destination_vbucket_state, None)
325                        if vm:
326                            vi = vm.get(vbucket_id, None)
327                            if vi:
328                                c = int(vi.get("checkpoint_id", checkpoint_id))
329                                checkpoint_id = max(checkpoint_id, c)
330                                m = int(vi.get("max_deleted_seqno", max_deleted_seqno))
331                                max_deleted_seqno = max(max_deleted_seqno, m)
332
333                        rv = self.save_vbucket_state(store, vbucket_id,
334                                                     destination_vbucket_state,
335                                                     checkpoint_id,
336                                                     max_deleted_seqno)
337                        if rv != 0:
338                            self.future_done(future, rv)
339                            store.close()
340                            return
341
342                        store.saveMultiple(bulk_keys, bulk_vals,
343                                           options=couchstore.CouchStore.COMPRESS)
344
345                    store.commit()
346                    store.close()
347                except Exception, e:
348                    self.future_done(future,
349                                     "error: could not save couchstore data"
350                                     "; vbucket_id: %s; store_path: %s"
351                                     "; exception: %s"
352                                     % (vbucket_id, store_path, e))
353                    return
354
355            self.future_done(future, 0) # No return to keep looping.
356
357    def save_vbucket_state(self, store, vbucket_id,
358                           state, checkpoint_id, max_deleted_seqno):
359        doc = json.dumps({'state': state,
360                          'checkpoint_id': str(checkpoint_id),
361                          'max_deleted_seqno': str(max_deleted_seqno)})
362        try:
363            store.localDocs['_local/vbstate'] = doc
364        except Exception, e:
365            return "error: save_vbucket_state() failed: " + str(e)
366        return 0
367
368    @staticmethod
369    def can_handle(opts, spec):
370        return spec.startswith(SFD_SCHEME)
371
372    @staticmethod
373    def check_base(opts, spec):
374        if getattr(opts, "destination_operation", None) != None:
375            return ("error: --destination-operation" +
376                    " is not supported by this destination: %s") % (spec)
377
378        # Skip immediate superclass Sink.check_base(),
379        # since SFDSink can handle different vbucket states.
380        return pump.EndPoint.check_base(opts, spec)
381
382    @staticmethod
383    def check(opts, spec, source_map):
384        # TODO: (2) SFDSink - check disk space.
385
386        rv, dir = data_dir(spec)
387        if rv != 0:
388            return rv
389        if not os.path.isdir(dir):
390            return "error: not a directory: " + dir, None
391        if not os.access(dir, os.W_OK):
392            return "error: directory is not writable: " + dir, None
393        return 0, None
394
395    @staticmethod
396    def consume_design(opts, sink_spec, sink_map,
397                       source_bucket, source_map, source_design):
398        if not source_design:
399            return 0
400
401        try:
402            sd = json.loads(source_design)
403        except ValueError, e:
404            return "error: could not parse source_design: " + source_design
405
406        rv, d = data_dir(sink_spec)
407        if rv != 0:
408            return rv
409
410        bucket_dir = d + '/' + source_bucket['name']
411        if not os.path.isdir(bucket_dir):
412            os.mkdir(bucket_dir)
413
414        rv, store, store_path = \
415            open_latest_store(bucket_dir,
416                              "master.couch.*",
417                              "^(master)\\.couch\\.([0-9]+)$",
418                              "master.couch.1")
419        if rv != 0:
420            return rv
421
422        bulk_keys = []
423        bulk_vals = []
424
425        if sd:
426            for row in sd['rows']:
427                logging.debug("design_doc row: " + str(row))
428
429                d = couchstore.DocumentInfo(str(row['id']))
430                if '_rev' in row['doc']:
431                    d.revMeta = str(row['doc']['_rev'])
432                    del row['doc']['_rev']
433                d.contentType = couchstore.DocumentInfo.IS_JSON
434
435                bulk_keys.append(d)
436                bulk_vals.append(json.dumps(row['doc']))
437
438            if bulk_keys and bulk_vals:
439                store.saveMultiple(bulk_keys, bulk_vals) # TODO: Compress ddocs?
440
441        store.commit()
442        store.close()
443        return 0
444
445    def consume_batch_async(self, batch):
446        return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
447
448    def open_store(self, vbucket_id):
449        # data_dir   => /opt/couchbase/var/lib/couchbase/data/
450        # bucket_dir =>   default/
451        # store_path =>     VBUCKET_ID.couch.COMPACTION_NUM
452        if vbucket_id >= SFD_VBUCKETS:
453            return "error: vbucket_id too large: %s" % (vbucket_id), None, None
454
455        rv, bucket_dir = self.find_bucket_dir()
456        if rv != 0:
457            return rv, None, None
458
459        return open_latest_store(bucket_dir, "%s.couch.*" % (vbucket_id), SFD_RE,
460                                 str(vbucket_id) + ".couch.1", mode='c')
461
462    def find_bucket_dir(self):
463        rv, d = data_dir(self.spec)
464        if rv != 0:
465            return rv, None
466
467        bucket_dir = d + '/' + self.source_bucket['name']
468        if not os.path.isdir(bucket_dir):
469            try:
470                os.mkdir(bucket_dir)
471            except OSError, e:
472                return ("error: could not create bucket_dir: %s; exception: %s"
473                        % (bucket_dir, e)), None
474
475        return 0, bucket_dir
476
477def open_latest_store(bucket_dir, glob_pattern, filter_re, default_name, mode='c'):
478    store_paths = latest_couch_files(bucket_dir,
479                                     glob_pattern=glob_pattern,
480                                     filter_re=filter_re)
481    if not store_paths:
482        if mode == 'r':
483            return 0, None, None
484        store_paths = [bucket_dir + '/' + default_name]
485    if len(store_paths) != 1:
486        return ("error: no single, latest couchstore file: %s" +
487                "; found: %s") % (glob_pattern, store_paths), None, None
488    try:
489        return 0, couchstore.CouchStore(str(store_paths[0]), mode), store_paths[0]
490    except Exception, e:
491        return ("error: could not open couchstore file: %s" +
492                "; exception: %s") % (store_paths[0], e), None, None
493
494def latest_couch_files(bucket_dir, glob_pattern='*.couch.*', filter_re=SFD_RE):
495    """Given directory of *.couch.VER files, returns files with largest VER suffixes."""
496    files = glob.glob(bucket_dir + '/' + glob_pattern)
497    files = [f for f in files if re.match(filter_re, os.path.basename(f))]
498    matches = [(re.match(filter_re, os.path.basename(f)), f) for f in files]
499    latest = {}
500    for match, file in matches:
501        top, _ = latest.get(match.group(1), (-1, None))
502        cur = int(match.group(2))
503        if cur > top:
504            latest[match.group(1)] = (cur, file)
505    return sorted([file for top, file in latest.values()])
506
507def data_dir(spec):
508    if not spec.startswith(SFD_SCHEME):
509        return "error: wrong scheme in spec: " + spec, None
510    dir = spec[len(SFD_SCHEME):]
511    if dir:
512        return 0, os.path.normpath(dir)
513    else:
514        return "error: missing dir in spec: " + spec, None
515
516