xref: /4.6.0/couchbase-cli/pump_sfd.py (revision 951668b6)
1#!/usr/bin/env python
2
3import glob
4import logging
5import os
6import Queue
7import re
8import json
9import struct
10import threading
11
12import couchstore
13import couchbaseConstants
14import pump
15from collections import defaultdict
16
17SFD_SCHEME = "couchstore-files://"
18SFD_VBUCKETS = 1024
19SFD_REV_META = ">QIIBB" # cas, exp, flg, flex_meta, dtype
20SFD_REV_SEQ = ">Q"
21SFD_DB_SEQ = ">Q"
22SFD_RE = "^([0-9]+)\\.couch\\.([0-9]+)$"
23
24# TODO: (1) SFDSource - total_msgs.
25# TODO: (1) SFDSink - ensure right user for bucket_dir.
26# TODO: (1) SFDSink - ensure right user for couchstore file.
27
28class SFDSource(pump.Source):
29    """Reads couchstore files from a couchbase server data directory."""
30
31    def __init__(self, opts, spec, source_bucket, source_node,
32                 source_map, sink_map, ctl, cur):
33        super(SFDSource, self).__init__(opts, spec, source_bucket, source_node,
34                                        source_map, sink_map, ctl, cur)
35        self.done = False
36        self.queue = None
37
38    @staticmethod
39    def can_handle(opts, spec):
40        return spec.startswith(SFD_SCHEME)
41
42    @staticmethod
43    def check_base(opts, spec):
44        # Skip immediate superclass Source.check_base(),
45        # since SFDSource can handle different vbucket states.
46        return pump.EndPoint.check_base(opts, spec)
47
48    @staticmethod
49    def check(opts, spec):
50        rv, d = data_dir(spec)
51        if rv != 0:
52            return rv
53
54        buckets = []
55
56        for bucket_dir in sorted(glob.glob(d + "/*/")):
57            if not glob.glob(bucket_dir + "/*.couch.*"):
58                continue
59            bucket_name = os.path.basename(os.path.dirname(bucket_dir))
60            if not bucket_name:
61                return "error: bucket_name too short: " + bucket_dir, None
62            rv, v = SFDSource.vbucket_states(opts, spec, bucket_dir)
63            if rv != 0:
64                return rv, None
65            buckets.append({'name': bucket_name,
66                            'nodes': [{'hostname': 'N/A',
67                                       'vbucket_states': v}]})
68
69        if not buckets:
70            return "error: no bucket subdirectories at: " + d, None
71
72        return 0, {'spec': spec, 'buckets': buckets}
73
74    @staticmethod
75    def vbucket_states(opts, spec, bucket_dir):
76        """Reads all the latest couchstore files in a directory, and returns
77           map of state string (e.g., 'active') to map of vbucket_id to doc."""
78        vbucket_states = defaultdict(dict)
79
80        for f in latest_couch_files(bucket_dir):
81            vbucket_id = int(re.match(SFD_RE, os.path.basename(f)).group(1))
82            try:
83                store = couchstore.CouchStore(f, 'r')
84                try:
85                    doc_str = store.localDocs['_local/vbstate']
86                    if doc_str:
87                        doc = json.loads(doc_str)
88                        state = doc.get('state', None)
89                        if state:
90                            vbucket_states[state][vbucket_id] = doc
91                        else:
92                            return "error: missing vbucket_state from: %s" \
93                                % (f), None
94                except Exception, e:
95                    return ("error: could not read _local/vbstate from: %s" +
96                            "; exception: %s") % (f, e), None
97                store.close()
98            except Exception, e:
99                return ("error: could not read couchstore file: %s" +
100                        "; exception: %s") % (f, e), None
101
102        if vbucket_states:
103            return 0, vbucket_states
104        return "error: no vbucket_states in files: %s" % (bucket_dir), None
105
106    @staticmethod
107    def provide_design(opts, source_spec, source_bucket, source_map):
108        rv, d = data_dir(source_spec)
109        if rv != 0:
110            return rv, None
111
112        bucket_dir = d + '/' + source_bucket['name']
113        if not os.path.isdir(bucket_dir):
114            return 0, None
115
116        rv, store, store_path = \
117            open_latest_store(bucket_dir,
118                              "master.couch.*",
119                              "^(master)\\.couch\\.([0-9]+)$",
120                              "master.couch.0",
121                              mode='r')
122        if rv != 0 or not store:
123            return rv, None
124
125        rows = []
126
127        for doc_info in store.changesSince(0):
128            if not doc_info.deleted:
129                try:
130                    doc_contents = doc_info.getContents(options=couchstore.CouchStore.DECOMPRESS)
131                except Exception, e:
132                    return ("error: could not read design doc: %s" +
133                            "; source_spec: %s; exception: %s") % \
134                            (doc_info.id, source_spec, e), None
135                try:
136                    doc = json.loads(doc_contents)
137                except ValueError, e:
138                    return ("error: could not parse design doc: %s" +
139                            "; source_spec: %s; exception: %s") % \
140                            (doc_info.id, source_spec, e), None
141
142                doc['id'] = doc.get('id', doc_info.id)
143                doc['_rev'] = doc.get('_rev', doc_info.revSequence)
144
145                rows.append({'id': doc_info.id, 'doc': doc})
146
147        store.close()
148
149        return 0, json.dumps(rows)
150
151    def provide_batch(self):
152        if self.done:
153            return 0, None
154
155        if not self.queue:
156            name = "c" + threading.currentThread().getName()[1:]
157            self.queue = Queue.Queue(2)
158            self.thread = threading.Thread(target=self.loader, name=name)
159            self.thread.daemon = True
160            self.thread.start()
161
162        rv, batch = self.queue.get()
163        self.queue.task_done()
164        if rv != 0 or batch is None:
165            self.done = True
166        return rv, batch
167
168    def loader(self):
169        rv, d = data_dir(self.spec)
170        if rv != 0:
171            self.queue.put((rv, None))
172            return
173
174        source_vbucket_state = \
175            getattr(self.opts, 'source_vbucket_state', 'active')
176
177        source_nodes = self.source_bucket['nodes']
178        if len(source_nodes) != 1:
179            self.queue.put(("error: expected 1 node in source_bucket: %s"
180                            % (self.source_bucket['name']), None))
181            return
182
183        vbucket_states = source_nodes[0].get('vbucket_states', None)
184        if not vbucket_states:
185            self.queue.put(("error: missing vbucket_states in source_bucket: %s"
186                            % (self.source_bucket['name']), None))
187            return
188
189        vbuckets = vbucket_states.get(source_vbucket_state, None)
190        if vbuckets is None: # Empty dict is valid.
191            self.queue.put(("error: missing vbuckets in source_bucket: %s"
192                            % (self.source_bucket['name']), None))
193            return
194
195        batch_max_size = self.opts.extra['batch_max_size']
196        batch_max_bytes = self.opts.extra['batch_max_bytes']
197
198        store = None
199        vbucket_id = None
200
201        # Level of indirection since we can't use python 3 nonlocal statement.
202        abatch = [pump.Batch(self)]
203
204        def change_callback(doc_info):
205            if doc_info:
206                key = doc_info.id
207                if self.skip(key, vbucket_id):
208                    return
209
210                if doc_info.deleted:
211                    cmd = couchbaseConstants.CMD_TAP_DELETE
212                    val = ''
213                else:
214                    cmd = couchbaseConstants.CMD_TAP_MUTATION
215                    val = doc_info.getContents(options=couchstore.CouchStore.DECOMPRESS)
216                try:
217                    cas, exp, flg, flex_meta, dtype = struct.unpack(SFD_REV_META, doc_info.revMeta)
218                    meta = doc_info.revSequence
219                    seqno = doc_info.sequence
220                    nmeta = 0
221                    msg = (cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta, 0)
222                    abatch[0].append(msg, len(val))
223                except Exception, e:
224                    self.queue.put(("error: could not read couchstore file due to unsupported file format version;"
225                                    " exception: %s"% e, None))
226                    return
227
228            if (abatch[0].size() >= batch_max_size or
229                abatch[0].bytes >= batch_max_bytes):
230                self.queue.put((0, abatch[0]))
231                abatch[0] = pump.Batch(self)
232
233        for f in latest_couch_files(d + '/' + self.source_bucket['name']):
234            vbucket_id = int(re.match(SFD_RE, os.path.basename(f)).group(1))
235            if not vbucket_id in vbuckets:
236                continue
237
238            try:
239                store = couchstore.CouchStore(f, 'r')
240                store.forEachChange(0, change_callback)
241                store.close()
242            except Exception, e:
243                #MB-12270: Some files may be deleted due to compaction. We can
244                #safely ingore them and move to next file.
245                pass
246
247        if abatch[0].size():
248            self.queue.put((0, abatch[0]))
249        self.queue.put((0, None))
250
251
252class SFDSink(pump.Sink):
253    """Sink for couchstore in couchbase server/file/directory layout."""
254
255    def __init__(self, opts, spec, source_bucket, source_node,
256                 source_map, sink_map, ctl, cur):
257        super(SFDSink, self).__init__(opts, spec, source_bucket, source_node,
258                                      source_map, sink_map, ctl, cur)
259        self.rehash = opts.extra.get("rehash", 0)
260        self.init_worker(SFDSink.run)
261
262    @staticmethod
263    def run(self):
264        destination_vbucket_state = \
265            getattr(self.opts, 'destination_vbucket_state', 'active')
266
267        vbucket_states = self.source_node.get('vbucket_states', {})
268
269        while not self.ctl['stop']:
270            batch, future = self.pull_next_batch()
271            if not batch:
272                return self.future_done(future, 0)
273
274            vbuckets = batch.group_by_vbucket_id(SFD_VBUCKETS, self.rehash)
275            for vbucket_id, msgs in vbuckets.iteritems():
276                checkpoint_id = 0
277                max_deleted_seqno = 0
278
279                rv, store, store_path = self.open_store(vbucket_id)
280                if rv != 0:
281                    return self.future_done(future, rv)
282
283                bulk_keys = []
284                bulk_vals = []
285
286                for i, msg in enumerate(msgs):
287                    cmd, _vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta, conf_res = msg
288                    if self.skip(key, vbucket_id):
289                        continue
290
291                    d = couchstore.DocumentInfo(str(key))
292                    flex_meta = 1
293                    d.revMeta = str(struct.pack(SFD_REV_META, cas, exp, flg, flex_meta, dtype))
294                    if meta:
295                        if len(meta) > 8:
296                            meta = meta[0:8]
297                        if len(meta) < 8:
298                            meta = ('\x00\x00\x00\x00\x00\x00\x00\x00' + meta)[-8:]
299                        d.revSequence, = struct.unpack(SFD_REV_SEQ, meta)
300                    else:
301                        d.revSequence = 1
302
303                    if seqno:
304                        d.sequence = int(seqno)
305                    if cmd == couchbaseConstants.CMD_TAP_MUTATION:
306                        v = str(val)
307                        try:
308                            if (re.match('^\\s*{', v) and
309                                json.loads(v) is not None):
310                                d.contentType = couchstore.DocumentInfo.IS_JSON
311                        except ValueError:
312                            pass # NON_JSON is already the default contentType.
313                    elif cmd == couchbaseConstants.CMD_TAP_DELETE:
314                        v = None
315                    else:
316                        self.future_done(future,
317                                         "error: SFDSink bad cmd: " + str(cmd))
318                        store.close()
319                        return
320
321                    bulk_keys.append(d)
322                    bulk_vals.append(v)
323
324                try:
325                    if bulk_keys and bulk_vals:
326                        vm = vbucket_states.get(destination_vbucket_state, None)
327                        if vm:
328                            vi = vm.get(vbucket_id, None)
329                            if vi:
330                                c = int(vi.get("checkpoint_id", checkpoint_id))
331                                checkpoint_id = max(checkpoint_id, c)
332                                m = int(vi.get("max_deleted_seqno", max_deleted_seqno))
333                                max_deleted_seqno = max(max_deleted_seqno, m)
334
335                        rv = self.save_vbucket_state(store, vbucket_id,
336                                                     destination_vbucket_state,
337                                                     checkpoint_id,
338                                                     max_deleted_seqno)
339                        if rv != 0:
340                            self.future_done(future, rv)
341                            store.close()
342                            return
343
344                        store.saveMultiple(bulk_keys, bulk_vals,
345                                           options=couchstore.CouchStore.COMPRESS)
346
347                    store.commit()
348                    store.close()
349                except Exception, e:
350                    self.future_done(future,
351                                     "error: could not save couchstore data"
352                                     "; vbucket_id: %s; store_path: %s"
353                                     "; exception: %s"
354                                     % (vbucket_id, store_path, e))
355                    return
356
357            self.future_done(future, 0) # No return to keep looping.
358
359    def save_vbucket_state(self, store, vbucket_id,
360                           state, checkpoint_id, max_deleted_seqno):
361        doc = json.dumps({'state': state,
362                          'checkpoint_id': str(checkpoint_id),
363                          'max_deleted_seqno': str(max_deleted_seqno)})
364        try:
365            store.localDocs['_local/vbstate'] = doc
366        except Exception, e:
367            return "error: save_vbucket_state() failed: " + str(e)
368        return 0
369
370    @staticmethod
371    def can_handle(opts, spec):
372        return spec.startswith(SFD_SCHEME)
373
374    @staticmethod
375    def check_base(opts, spec):
376        if getattr(opts, "destination_operation", None) != None:
377            return ("error: --destination-operation" +
378                    " is not supported by this destination: %s") % (spec)
379
380        # Skip immediate superclass Sink.check_base(),
381        # since SFDSink can handle different vbucket states.
382        return pump.EndPoint.check_base(opts, spec)
383
384    @staticmethod
385    def check(opts, spec, source_map):
386        # TODO: (2) SFDSink - check disk space.
387
388        rv, dir = data_dir(spec)
389        if rv != 0:
390            return rv
391        if not os.path.isdir(dir):
392            return "error: not a directory: " + dir, None
393        if not os.access(dir, os.W_OK):
394            return "error: directory is not writable: " + dir, None
395        return 0, None
396
397    @staticmethod
398    def consume_design(opts, sink_spec, sink_map,
399                       source_bucket, source_map, source_design):
400        if not source_design:
401            return 0
402
403        try:
404            sd = json.loads(source_design)
405        except ValueError, e:
406            return "error: could not parse source_design: " + source_design
407
408        rv, d = data_dir(sink_spec)
409        if rv != 0:
410            return rv
411
412        bucket_dir = d + '/' + source_bucket['name']
413        if not os.path.isdir(bucket_dir):
414            os.mkdir(bucket_dir)
415
416        rv, store, store_path = \
417            open_latest_store(bucket_dir,
418                              "master.couch.*",
419                              "^(master)\\.couch\\.([0-9]+)$",
420                              "master.couch.1")
421        if rv != 0:
422            return rv
423
424        bulk_keys = []
425        bulk_vals = []
426
427        if sd:
428            for row in sd['rows']:
429                logging.debug("design_doc row: " + str(row))
430
431                d = couchstore.DocumentInfo(str(row['id']))
432                if '_rev' in row['doc']:
433                    d.revMeta = str(row['doc']['_rev'])
434                    del row['doc']['_rev']
435                d.contentType = couchstore.DocumentInfo.IS_JSON
436
437                bulk_keys.append(d)
438                bulk_vals.append(json.dumps(row['doc']))
439
440            if bulk_keys and bulk_vals:
441                store.saveMultiple(bulk_keys, bulk_vals) # TODO: Compress ddocs?
442
443        store.commit()
444        store.close()
445        return 0
446
447    def consume_batch_async(self, batch):
448        return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
449
450    def open_store(self, vbucket_id):
451        # data_dir   => /opt/couchbase/var/lib/couchbase/data/
452        # bucket_dir =>   default/
453        # store_path =>     VBUCKET_ID.couch.COMPACTION_NUM
454        if vbucket_id >= SFD_VBUCKETS:
455            return "error: vbucket_id too large: %s" % (vbucket_id), None, None
456
457        rv, bucket_dir = self.find_bucket_dir()
458        if rv != 0:
459            return rv, None, None
460
461        return open_latest_store(bucket_dir, "%s.couch.*" % (vbucket_id), SFD_RE,
462                                 str(vbucket_id) + ".couch.1", mode='c')
463
464    def find_bucket_dir(self):
465        rv, d = data_dir(self.spec)
466        if rv != 0:
467            return rv, None
468
469        bucket_dir = d + '/' + self.source_bucket['name']
470        if not os.path.isdir(bucket_dir):
471            try:
472                os.mkdir(bucket_dir)
473            except OSError, e:
474                return ("error: could not create bucket_dir: %s; exception: %s"
475                        % (bucket_dir, e)), None
476
477        return 0, bucket_dir
478
479def open_latest_store(bucket_dir, glob_pattern, filter_re, default_name, mode='c'):
480    store_paths = latest_couch_files(bucket_dir,
481                                     glob_pattern=glob_pattern,
482                                     filter_re=filter_re)
483    if not store_paths:
484        if mode == 'r':
485            return 0, None, None
486        store_paths = [bucket_dir + '/' + default_name]
487    if len(store_paths) != 1:
488        return ("error: no single, latest couchstore file: %s" +
489                "; found: %s") % (glob_pattern, store_paths), None, None
490    try:
491        return 0, couchstore.CouchStore(str(store_paths[0]), mode), store_paths[0]
492    except Exception, e:
493        return ("error: could not open couchstore file: %s" +
494                "; exception: %s") % (store_paths[0], e), None, None
495
496def latest_couch_files(bucket_dir, glob_pattern='*.couch.*', filter_re=SFD_RE):
497    """Given directory of *.couch.VER files, returns files with largest VER suffixes."""
498    files = glob.glob(bucket_dir + '/' + glob_pattern)
499    files = [f for f in files if re.match(filter_re, os.path.basename(f))]
500    matches = [(re.match(filter_re, os.path.basename(f)), f) for f in files]
501    latest = {}
502    for match, file in matches:
503        top, _ = latest.get(match.group(1), (-1, None))
504        cur = int(match.group(2))
505        if cur > top:
506            latest[match.group(1)] = (cur, file)
507    return sorted([file for top, file in latest.values()])
508
509def data_dir(spec):
510    if not spec.startswith(SFD_SCHEME):
511        return "error: wrong scheme in spec: " + spec, None
512    dir = spec[len(SFD_SCHEME):]
513    if dir:
514        return 0, os.path.normpath(dir)
515    else:
516        return "error: missing dir in spec: " + spec, None
517
518