xref: /3.0.3-GA/couchbase-cli/pump.py (revision 777028a7)
1#!/usr/bin/env python
2
3import os
4import base64
5import copy
6import httplib
7import logging
8import re
9import simplejson as json
10import string
11import sys
12import threading
13import time
14import urlparse
15import zlib
16import platform
17import subprocess
18
19import couchbaseConstants
20from cbcollections import defaultdict
21from cbqueue import PumpQueue
22import cbsnappy as snappy
23
24# TODO: (1) optionally log into backup directory
25
26LOGGING_FORMAT = '%(asctime)s: %(threadName)s %(message)s'
27
28NA = 'N/A'
29
30class ProgressReporter(object):
31    """Mixin to report progress"""
32
33    def report_init(self):
34        self.beg_time = time.time()
35        self.prev_time = self.beg_time
36        self.prev = defaultdict(int)
37
38    def report(self, prefix="", emit=None):
39        if not emit:
40            emit = logging.info
41
42        if getattr(self, "source", None):
43            emit(prefix + "source : %s" % (self.source))
44        if getattr(self, "sink", None):
45            emit(prefix + "sink   : %s" % (self.sink))
46
47        cur_time = time.time()
48        delta = cur_time - self.prev_time
49        c, p = self.cur, self.prev
50        x = sorted([k for k in c.iterkeys() if "_sink_" in k])
51
52        width_k = max([5] + [len(k.replace("tot_sink_", "")) for k in x])
53        width_v = max([20] + [len(str(c[k])) for k in x])
54        width_d = max([10] + [len(str(c[k] - p[k])) for k in x])
55        width_s = max([10] + [len("%0.1f" % ((c[k] - p[k]) / delta)) for k in x])
56        emit(prefix + " %s : %s | %s | %s"
57             % (string.ljust("", width_k),
58                string.rjust("total", width_v),
59                string.rjust("last", width_d),
60                string.rjust("per sec", width_s)))
61        verbose_set = ["tot_sink_batch", "tot_sink_msg"]
62        for k in x:
63            if k not in verbose_set or self.opts.verbose > 0:
64                emit(prefix + " %s : %s | %s | %s"
65                 % (string.ljust(k.replace("tot_sink_", ""), width_k),
66                    string.rjust(str(c[k]), width_v),
67                    string.rjust(str(c[k] - p[k]), width_d),
68                    string.rjust("%0.1f" % ((c[k] - p[k]) / delta), width_s)))
69        self.prev_time = cur_time
70        self.prev = copy.copy(c)
71
72    def bar(self, current, total):
73        if not total:
74            return '.'
75        if sys.platform.lower().startswith('win'):
76            cr = "\r"
77        else:
78            cr = chr(27) + "[A\n"
79        pct = float(current) / total
80        max_hash = 20
81        num_hash = int(round(pct * max_hash))
82        return ("  [%s%s] %0.1f%% (%s/estimated %s msgs)%s" %
83                ('#' * num_hash, ' ' * (max_hash - num_hash),
84                 100.0 * pct, current, total, cr))
85
86class PumpingStation(ProgressReporter):
87    """Queues and watchdogs multiple pumps across concurrent workers."""
88
89    def __init__(self, opts, source_class, source_spec, sink_class, sink_spec):
90        self.opts = opts
91        self.source_class = source_class
92        self.source_spec = source_spec
93        self.sink_class = sink_class
94        self.sink_spec = sink_spec
95        self.queue = None
96        tmstamp = time.strftime("%Y-%m-%dT%H%M%SZ", time.gmtime())
97        self.ctl = { 'stop': False,
98                     'rv': 0,
99                     'new_session': True,
100                     'new_timestamp': tmstamp}
101        self.cur = defaultdict(int)
102
103    def run(self):
104        # TODO: (6) PumpingStation - monitor source for topology changes.
105        # TODO: (4) PumpingStation - retry on err N times, M times / server.
106        # TODO: (2) PumpingStation - track checksum in backup, for later restore.
107
108        rv, source_map, sink_map = self.check_endpoints()
109        if rv != 0:
110            return rv
111
112        if self.opts.dry_run:
113            sys.stderr.write("done, but no data written due to dry-run\n")
114            return 0
115
116        source_buckets = self.filter_source_buckets(source_map)
117        if not source_buckets:
118            bucket_source = getattr(self.opts, "bucket_source", None)
119            if bucket_source:
120                return ("error: there is no bucket: %s at source: %s" %
121                        (bucket_source, self.source_spec))
122            else:
123                return ("error: no transferrable buckets at source: %s" %
124                        (self.source_spec))
125
126        for source_bucket in sorted(source_buckets,
127                                    key=lambda b: b['name']):
128            logging.info("bucket: " + source_bucket['name'])
129
130            if not self.opts.extra.get("design_doc_only", 0):
131                rv = self.transfer_bucket_msgs(source_bucket, source_map, sink_map)
132                if rv != 0:
133                    return rv
134            else:
135                sys.stderr.write("transfer design doc only. bucket msgs will be skipped.\n")
136
137            if not self.opts.extra.get("data_only", 0):
138                rv = self.transfer_bucket_design(source_bucket, source_map, sink_map)
139                if rv != 0:
140                    return rv
141            else:
142                sys.stderr.write("transfer data only. bucket design docs will be skipped.\n")
143
144            # TODO: (5) PumpingStation - validate bucket transfers.
145
146        # TODO: (4) PumpingStation - validate source/sink maps were stable.
147
148        sys.stderr.write("done\n")
149        return 0
150
151    def check_endpoints(self):
152        logging.debug("source_class: %s", self.source_class)
153        rv = self.source_class.check_base(self.opts, self.source_spec)
154        if rv != 0:
155            return rv, None, None
156        rv, source_map = self.source_class.check(self.opts, self.source_spec)
157        if rv != 0:
158            return rv, None, None
159
160        logging.debug("sink_class: %s", self.sink_class)
161        rv = self.sink_class.check_base(self.opts, self.sink_spec)
162        if rv != 0:
163            return rv, None, None
164        rv, sink_map = self.sink_class.check(self.opts, self.sink_spec, source_map)
165        if rv != 0:
166            return rv, None, None
167
168        return rv, source_map, sink_map
169
170    def filter_source_buckets(self, source_map):
171        """Filter the source_buckets if a bucket_source was specified."""
172        source_buckets = source_map['buckets']
173        logging.debug("source_buckets: " +
174                      ",".join([n['name'] for n in source_buckets]))
175
176        bucket_source = getattr(self.opts, "bucket_source", None)
177        if bucket_source:
178            logging.debug("bucket_source: " + bucket_source)
179            source_buckets = [b for b in source_buckets
180                              if b['name'] == bucket_source]
181            logging.debug("source_buckets filtered: " +
182                          ",".join([n['name'] for n in source_buckets]))
183        return source_buckets
184
185    def filter_source_nodes(self, source_bucket, source_map):
186        """Filter the source_bucket's nodes if single_node was specified."""
187        if getattr(self.opts, "single_node", None):
188            if not source_map.get('spec_parts'):
189                return ("error: no single_node from source: %s" +
190                        "; the source may not support the --single-node flag") % \
191                        (self.source_spec)
192            source_nodes = filter_bucket_nodes(source_bucket,
193                                               source_map.get('spec_parts'))
194        else:
195            source_nodes = source_bucket['nodes']
196
197        logging.debug(" source_nodes: " + ",".join([n.get('hostname', NA)
198                                                    for n in source_nodes]))
199        return source_nodes
200
201    def transfer_bucket_msgs(self, source_bucket, source_map, sink_map):
202        source_nodes = self.filter_source_nodes(source_bucket, source_map)
203
204        # Transfer bucket msgs with a Pump per source server.
205        self.start_workers(len(source_nodes))
206        self.report_init()
207
208        self.ctl['run_msg'] = 0
209        self.ctl['tot_msg'] = 0
210
211        for source_node in sorted(source_nodes,
212                                  key=lambda n: n.get('hostname', NA)):
213            logging.debug(" enqueueing node: " +
214                          source_node.get('hostname', NA))
215            self.queue.put((source_bucket, source_node, source_map, sink_map))
216
217            rv, tot = self.source_class.total_msgs(self.opts,
218                                                   source_bucket,
219                                                   source_node,
220                                                   source_map)
221            if rv != 0:
222                return rv
223            if tot:
224                self.ctl['tot_msg'] += tot
225
226        # Don't use queue.join() as it eats Ctrl-C's.
227        s = 0.05
228        while self.queue.unfinished_tasks:
229            time.sleep(s)
230            s = min(1.0, s + 0.01)
231
232        rv = self.ctl['rv']
233        if rv != 0:
234            return rv
235
236        time.sleep(0.01) # Allows threads to update counters.
237
238        sys.stderr.write(self.bar(self.ctl['run_msg'],
239                                  self.ctl['tot_msg']) + "\n")
240        sys.stderr.write("bucket: " + source_bucket['name'] +
241                         ", msgs transferred...\n")
242        def emit(msg):
243            sys.stderr.write(msg + "\n")
244        self.report(emit=emit)
245
246        return 0
247
248    def transfer_bucket_design(self, source_bucket, source_map, sink_map):
249        """Transfer bucket design (e.g., design docs, views)."""
250        rv, source_design = \
251            self.source_class.provide_design(self.opts, self.source_spec,
252                                             source_bucket, source_map)
253        if rv == 0:
254            if source_design:
255                rv = self.sink_class.consume_design(self.opts,
256                                                self.sink_spec, sink_map,
257                                                source_bucket, source_map,
258                                                source_design)
259        return rv
260
261    @staticmethod
262    def run_worker(self, thread_index):
263        while not self.ctl['stop']:
264            source_bucket, source_node, source_map, sink_map = \
265                self.queue.get()
266            hostname = source_node.get('hostname', NA)
267            logging.debug(" node: %s" % (hostname))
268
269            curx = defaultdict(int)
270            self.source_class.check_spec(source_bucket,
271                                         source_node,
272                                         self.opts,
273                                         self.source_spec,
274                                         curx)
275            self.sink_class.check_spec(source_bucket,
276                                       source_node,
277                                       self.opts,
278                                       self.sink_spec,
279                                       curx)
280            rv = Pump(self.opts,
281                      self.source_class(self.opts, self.source_spec,
282                                        source_bucket, source_node,
283                                        source_map, sink_map, self.ctl, curx),
284                      self.sink_class(self.opts, self.sink_spec,
285                                      source_bucket, source_node,
286                                      source_map, sink_map, self.ctl, curx),
287                      source_map, sink_map, self.ctl, curx).run()
288
289            for k, v in curx.items():
290                if isinstance(v, int):
291                    self.cur[k] = self.cur.get(k, 0) + v
292
293            logging.debug(" node: %s, done; rv: %s" % (hostname, rv))
294            if self.ctl['rv'] == 0 and rv != 0:
295                self.ctl['rv'] = rv
296
297            self.queue.task_done()
298
299    def start_workers(self, queue_size):
300        if self.queue:
301            return
302
303        self.queue = PumpQueue(queue_size)
304
305        threads = [threading.Thread(target=PumpingStation.run_worker,
306                                    name="w" + str(i), args=(self, i))
307                   for i in range(self.opts.threads)]
308        for thread in threads:
309            thread.daemon = True
310            thread.start()
311
312    @staticmethod
313    def find_handler(opts, x, classes):
314        for s in classes:
315            if s.can_handle(opts, x):
316                return s
317        return None
318
319
320class Pump(ProgressReporter):
321    """Moves batches of data from one Source to one Sink."""
322
323    def __init__(self, opts, source, sink, source_map, sink_map, ctl, cur):
324        self.opts = opts
325        self.source = source
326        self.sink = sink
327        self.source_map = source_map
328        self.sink_map = sink_map
329        self.ctl = ctl
330        self.cur = cur # Should be a defaultdict(int); 0 as default value.
331
332    def run(self):
333        future = None
334
335        # TODO: (2) Pump - timeouts when providing/consuming/waiting.
336
337        report = int(self.opts.extra.get("report", 5))
338        report_full = int(self.opts.extra.get("report_full", 2000))
339
340        self.report_init()
341
342        n = 0
343
344        while not self.ctl['stop']:
345            rv_batch, batch = self.source.provide_batch()
346            if rv_batch != 0:
347                return self.done(rv_batch)
348
349            if future:
350                rv = future.wait_until_consumed()
351                if rv != 0:
352                    # TODO: (5) Pump - retry logic on consume error.
353                    return self.done(rv)
354
355                self.cur['tot_sink_batch'] += 1
356                self.cur['tot_sink_msg'] += future.batch.size()
357                self.cur['tot_sink_byte'] += future.batch.bytes
358
359                self.ctl['run_msg'] += future.batch.size()
360                self.ctl['tot_msg'] += future.batch.adjust_size
361
362            if not batch:
363                return self.done(0)
364
365            self.cur['tot_source_batch'] += 1
366            self.cur['tot_source_msg'] += batch.size()
367            self.cur['tot_source_byte'] += batch.bytes
368
369            rv_future, future = self.sink.consume_batch_async(batch)
370            if rv_future != 0:
371                return self.done(rv_future)
372
373            n = n + 1
374            if report_full > 0 and n % report_full == 0:
375                if self.opts.verbose > 0:
376                    sys.stderr.write("\n")
377                logging.info("  progress...")
378                self.report(prefix="  ")
379            elif report > 0 and n % report == 0:
380                sys.stderr.write(self.bar(self.ctl['run_msg'],
381                                          self.ctl['tot_msg']))
382
383        return self.done(0)
384
385    def done(self, rv):
386        self.source.close()
387        self.sink.close()
388
389        logging.debug("  pump (%s->%s) done.", self.source, self.sink)
390        self.report(prefix="  ")
391
392        if (rv == 0 and
393            (self.cur['tot_source_batch'] != self.cur['tot_sink_batch'] or
394             self.cur['tot_source_msg'] != self.cur['tot_sink_msg'] or
395             self.cur['tot_source_byte'] != self.cur['tot_sink_byte'])):
396            return "error: sink missing some source msgs: " + str(self.cur)
397
398        return rv
399
400
401# --------------------------------------------------
402
403class EndPoint(object):
404
405    def __init__(self, opts, spec, source_bucket, source_node,
406                 source_map, sink_map, ctl, cur):
407        self.opts = opts
408        self.spec = spec
409        self.source_bucket = source_bucket
410        self.source_node = source_node
411        self.source_map = source_map
412        self.sink_map = sink_map
413        self.ctl = ctl
414        self.cur = cur
415
416        self.only_key_re = None
417        k = getattr(opts, "key", None)
418        if k:
419            self.only_key_re = re.compile(k)
420
421        self.only_vbucket_id = getattr(opts, "id", None)
422
423    @staticmethod
424    def check_base(opts, spec):
425        k = getattr(opts, "key", None)
426        if k:
427            try:
428                re.compile(k)
429            except:
430                return "error: could not parse key regexp: " + k
431        return 0
432
433    @staticmethod
434    def check_spec(source_bucket, source_node, opts, spec, cur):
435        cur['seqno'] = {}
436        cur['failoverlog'] = {}
437        return 0
438
439    def __repr__(self):
440        return "%s(%s@%s)" % \
441            (self.spec,
442             self.source_bucket.get('name', ''),
443             self.source_node.get('hostname', ''))
444
445    def close(self):
446        pass
447
448    def skip(self, key, vbucket_id):
449        if (self.only_key_re and not re.search(self.only_key_re, key)):
450            logging.warn("skipping msg with key: " + str(key))
451            return True
452
453        if (self.only_vbucket_id is not None and
454            self.only_vbucket_id != vbucket_id):
455            logging.warn("skipping msg of vbucket_id: " + str(vbucket_id))
456            return True
457
458        return False
459
460    def add_counter(self, key, val=1):
461        self.cur[key] = self.cur.get(key, 0.0) + val
462
463
464class Source(EndPoint):
465    """Base class for all data sources."""
466
467    @staticmethod
468    def can_handle(opts, spec):
469        assert False, "unimplemented"
470
471    @staticmethod
472    def check_base(opts, spec):
473        rv = EndPoint.check_base(opts, spec)
474        if rv != 0:
475            return rv
476        if getattr(opts, "source_vbucket_state", "active") != "active":
477            return ("error: only --source-vbucket-state=active" +
478                    " is supported by this source: %s") % (spec)
479        return 0
480
481    @staticmethod
482    def check(opts, spec):
483        """Subclasses can check preconditions before any pumping starts."""
484        assert False, "unimplemented"
485
486    @staticmethod
487    def provide_design(opts, source_spec, source_bucket, source_map):
488        assert False, "unimplemented"
489
490    def provide_batch(self):
491        assert False, "unimplemented"
492
493    @staticmethod
494    def total_msgs(opts, source_bucket, source_node, source_map):
495        return 0, None # Subclasses can return estimate # msgs.
496
497
498class Sink(EndPoint):
499    """Base class for all data sinks."""
500
501    # TODO: (2) Sink handles filtered restore by data.
502
503    def __init__(self, opts, spec, source_bucket, source_node,
504                 source_map, sink_map, ctl, cur):
505        super(Sink, self).__init__(opts, spec, source_bucket, source_node,
506                                   source_map, sink_map, ctl, cur)
507        self.op = None
508
509    @staticmethod
510    def can_handle(opts, spec):
511        assert False, "unimplemented"
512
513    @staticmethod
514    def check_base(opts, spec):
515        rv = EndPoint.check_base(opts, spec)
516        if rv != 0:
517            return rv
518        if getattr(opts, "destination_vbucket_state", "active") != "active":
519            return ("error: only --destination-vbucket-state=active" +
520                    " is supported by this destination: %s") % (spec)
521        if getattr(opts, "destination_operation", None) != None:
522            return ("error: --destination-operation" +
523                    " is not supported by this destination: %s") % (spec)
524        return 0
525
526    @staticmethod
527    def check(opts, spec, source_map):
528        """Subclasses can check preconditions before any pumping starts."""
529        assert False, "unimplemented"
530
531    @staticmethod
532    def consume_design(opts, sink_spec, sink_map,
533                       source_bucket, source_map, source_design):
534        assert False, "unimplemented"
535
536    def consume_batch_async(self, batch):
537        """Subclasses should return a SinkBatchFuture."""
538        assert False, "unimplemented"
539
540    @staticmethod
541    def check_source(opts, source_class, source_spec, sink_class, sink_spec):
542        if source_spec == sink_spec:
543            return "error: source and sink must be different;" \
544                " source: " + source_spec + \
545                " sink: " + sink_spec
546        return None
547
548    def operation(self):
549        if not self.op:
550            self.op = getattr(self.opts, "destination_operation", None)
551            if not self.op:
552                self.op = "set"
553                if getattr(self.opts, "add", False):
554                    self.op = "add"
555        return self.op
556
557    def init_worker(self, target):
558        self.worker_go = threading.Event()
559        self.worker_work = None # May be None or (batch, future) tuple.
560        self.worker = threading.Thread(target=target, args=(self,),
561                                       name="s" + threading.currentThread().getName()[1:])
562        self.worker.daemon = True
563        self.worker.start()
564
565    def push_next_batch(self, batch, future):
566        """Push batch/future to worker."""
567        if not self.worker.isAlive():
568            return "error: cannot use a dead worker", None
569
570        self.worker_work = (batch, future)
571        self.worker_go.set()
572        return 0, future
573
574    def pull_next_batch(self):
575        """Worker calls this method to get the next batch/future."""
576        self.worker_go.wait()
577        batch, future = self.worker_work
578        self.worker_work = None
579        self.worker_go.clear()
580        return batch, future
581
582    def future_done(self, future, rv):
583        """Worker calls this method to finish a batch/future."""
584        if rv != 0:
585            logging.error("error: async operation: %s on sink: %s" %
586                          (rv, self))
587        if future:
588            future.done_rv = rv
589            future.done.set()
590
591
592# --------------------------------------------------
593
594class Batch(object):
595    """Holds a batch of data being transfered from source to sink."""
596
597    def __init__(self, source):
598        self.source = source
599        self.msgs = []
600        self.bytes = 0
601        self.adjust_size = 0
602
603    def append(self, msg, num_bytes):
604        self.msgs.append(msg)
605        self.bytes = self.bytes + num_bytes
606
607    def size(self):
608        return len(self.msgs)
609
610    def msg(self, i):
611        return self.msgs[i]
612
613    def group_by_vbucket_id(self, vbuckets_num, rehash=0):
614        """Returns dict of vbucket_id->[msgs] grouped by msg's vbucket_id."""
615        g = defaultdict(list)
616        for msg in self.msgs:
617            cmd, vbucket_id, key = msg[:3]
618            if vbucket_id == 0x0000ffff or rehash == 1:
619                # Special case when the source did not supply a vbucket_id
620                # (such as stdin source), so we calculate it.
621                vbucket_id = (zlib.crc32(key) >> 16) & (vbuckets_num - 1)
622                msg = (cmd, vbucket_id) + msg[2:]
623            g[vbucket_id].append(msg)
624        return g
625
626
627class SinkBatchFuture(object):
628    """Future completion of a sink consuming a batch."""
629
630    def __init__(self, sink, batch):
631        self.sink = sink
632        self.batch = batch
633        self.done = threading.Event()
634        self.done_rv = None
635
636    def wait_until_consumed(self):
637        self.done.wait()
638        return self.done_rv
639
640
641# --------------------------------------------------
642
643class StdInSource(Source):
644    """Reads batches from stdin in memcached ascii protocol."""
645
646    def __init__(self, opts, spec, source_bucket, source_node,
647                 source_map, sink_map, ctl, cur):
648        super(StdInSource, self).__init__(opts, spec, source_bucket, source_node,
649                                          source_map, sink_map, ctl, cur)
650        self.f = sys.stdin
651
652    @staticmethod
653    def can_handle(opts, spec):
654        return spec.startswith("stdin:") or spec == "-"
655
656    @staticmethod
657    def check(opts, spec):
658        return 0, {'spec': spec,
659                   'buckets': [{'name': 'stdin:',
660                                'nodes': [{'hostname': 'N/A'}]}] }
661
662    @staticmethod
663    def provide_design(opts, source_spec, source_bucket, source_map):
664        return 0, None
665
666    def provide_batch(self):
667        batch = Batch(self)
668
669        batch_max_size = self.opts.extra['batch_max_size']
670        batch_max_bytes = self.opts.extra['batch_max_bytes']
671
672        vbucket_id = 0x0000ffff
673
674        while (self.f and
675               batch.size() < batch_max_size and
676               batch.bytes < batch_max_bytes):
677            line = self.f.readline()
678            if not line:
679                self.f = None
680                return 0, batch
681
682            parts = line.split(' ')
683            if not parts:
684                return "error: read empty line", None
685            elif parts[0] == 'set' or parts[0] == 'add':
686                if len(parts) != 5:
687                    return "error: length of set/add line: " + line, None
688                cmd = couchbaseConstants.CMD_TAP_MUTATION
689                key = parts[1]
690                flg = int(parts[2])
691                exp = int(parts[3])
692                num = int(parts[4])
693                if num > 0:
694                    val = self.f.read(num)
695                    if len(val) != num:
696                        return "error: value read failed at: " + line, None
697                else:
698                    val = ''
699                end = self.f.read(2) # Read '\r\n'.
700                if len(end) != 2:
701                    return "error: value end read failed at: " + line, None
702
703                if not self.skip(key, vbucket_id):
704                    msg = (cmd, vbucket_id, key, flg, exp, 0, '', val, 0, 0, 0)
705                    batch.append(msg, len(val))
706            elif parts[0] == 'delete':
707                if len(parts) != 2:
708                    return "error: length of delete line: " + line, None
709                cmd = couchbaseConstants.CMD_TAP_DELETE
710                key = parts[1]
711                if not self.skip(key, vbucket_id):
712                    msg = (cmd, vbucket_id, key, 0, 0, 0, '', '', 0, 0, 0)
713                    batch.append(msg, 0)
714            else:
715                return "error: expected set/add/delete but got: " + line, None
716
717        if batch.size() <= 0:
718            return 0, None
719
720        return 0, batch
721
722
723class StdOutSink(Sink):
724    """Emits batches to stdout in memcached ascii protocol."""
725
726    @staticmethod
727    def can_handle(opts, spec):
728        if spec.startswith("stdout:") or spec == "-":
729            opts.threads = 1 # Force 1 thread to not overlap stdout.
730            return True
731        return False
732
733    @staticmethod
734    def check(opts, spec, source_map):
735        return 0, None
736
737    @staticmethod
738    def check_base(opts, spec):
739        if getattr(opts, "destination_vbucket_state", "active") != "active":
740            return ("error: only --destination-vbucket-state=active" +
741                    " is supported by this destination: %s") % (spec)
742
743        op = getattr(opts, "destination_operation", None)
744        if not op in [None, 'set', 'add', 'get']:
745            return ("error: --destination-operation unsupported value: %s" +
746                    "; use set, add, get") % (op)
747
748        # Skip immediate superclass Sink.check_base(),
749        # since StdOutSink can handle different destination operations.
750        return EndPoint.check_base(opts, spec)
751
752    @staticmethod
753    def consume_design(opts, sink_spec, sink_map,
754                       source_bucket, source_map, source_design):
755        if source_design:
756            logging.warn("warning: cannot save bucket design"
757                         " on a stdout destination")
758        return 0
759
760    def consume_batch_async(self, batch):
761        op = self.operation()
762        op_mutate = op in ['set', 'add']
763
764        stdout = sys.stdout
765        msg_visitor = None
766
767        opts_etc = getattr(self.opts, "etc", None)
768        if opts_etc:
769            stdout = opts_etc.get("stdout", sys.stdout)
770            msg_visitor = opts_etc.get("msg_visitor", None)
771
772        mcd_compatible = self.opts.extra.get("mcd_compatible", 1)
773        msg_tuple_format = 0
774        for msg in batch.msgs:
775            if msg_visitor:
776                msg = msg_visitor(msg)
777            if not msg_tuple_format:
778                msg_tuple_format = len(msg)
779            cmd, vbucket_id, key, flg, exp, cas, meta, val = msg[:8]
780            seqno = dtype = nmeta = 0
781            if msg_tuple_format > 8:
782                seqno, dtype, nmeta = msg[8:]
783            if self.skip(key, vbucket_id):
784                continue
785            if dtype > 2:
786                try:
787                    val = snappy.uncompress(val)
788                except Exception, err:
789                    pass
790            try:
791                if cmd == couchbaseConstants.CMD_TAP_MUTATION or \
792                   cmd == couchbaseConstants.CMD_DCP_MUTATION:
793                    if op_mutate:
794                        # <op> <key> <flags> <exptime> <bytes> [noreply]\r\n
795                        if mcd_compatible:
796                            stdout.write("%s %s %s %s %s\r\n" %
797                                         (op, key, flg, exp, len(val)))
798                        else:
799                            stdout.write("%s %s %s %s %s %s %s\r\n" %
800                                         (op, key, flg, exp, len(val), seqno, dtype))
801                        stdout.write(val)
802                        stdout.write("\r\n")
803                    elif op == 'get':
804                        stdout.write("get %s\r\n" % (key))
805                elif cmd == couchbaseConstants.CMD_TAP_DELETE or \
806                     cmd == couchbaseConstants.CMD_DCP_DELETE:
807                    if op_mutate:
808                        stdout.write("delete %s\r\n" % (key))
809                elif cmd == couchbaseConstants.CMD_GET:
810                    stdout.write("get %s\r\n" % (key))
811                else:
812                    return "error: StdOutSink - unknown cmd: " + str(cmd), None
813            except IOError:
814                return "error: could not write to stdout", None
815
816        stdout.flush()
817        future = SinkBatchFuture(self, batch)
818        self.future_done(future, 0)
819        return 0, future
820
821
822# --------------------------------------------------
823
824CMD_STR = {
825    couchbaseConstants.CMD_TAP_CONNECT: "TAP_CONNECT",
826    couchbaseConstants.CMD_TAP_MUTATION: "TAP_MUTATION",
827    couchbaseConstants.CMD_TAP_DELETE: "TAP_DELETE",
828    couchbaseConstants.CMD_TAP_FLUSH: "TAP_FLUSH",
829    couchbaseConstants.CMD_TAP_OPAQUE: "TAP_OPAQUE",
830    couchbaseConstants.CMD_TAP_VBUCKET_SET: "TAP_VBUCKET_SET",
831    couchbaseConstants.CMD_TAP_CHECKPOINT_START: "TAP_CHECKPOINT_START",
832    couchbaseConstants.CMD_TAP_CHECKPOINT_END: "TAP_CHECKPOINT_END",
833    couchbaseConstants.CMD_NOOP: "NOOP"
834}
835
836def get_username(username):
837    return username or os.environ.get('CB_REST_USERNAME', '')
838
839def get_password(password):
840    return password or os.environ.get('CB_REST_PASSWORD', '')
841
842def parse_spec(opts, spec, port):
843    """Parse host, port, username, password, path from opts and spec."""
844
845    # Example spec: http://Administrator:password@HOST:8091
846    p = urlparse.urlparse(spec)
847
848    # Example netloc: Administrator:password@HOST:8091
849    #ParseResult tuple(scheme, netloc, path, params, query, fragment)
850    netloc = p[1]
851
852    if not netloc: # When urlparse() can't parse non-http URI's.
853        netloc = spec.split('://')[-1].split('/')[0]
854
855    pair = netloc.split('@') # [ "user:pwsd", "host:port" ].
856    host = (pair[-1] + ":" + str(port)).split(':')[0]
857    port = (pair[-1] + ":" + str(port)).split(':')[1]
858    try:
859       val = int(port)
860    except ValueError:
861       logging.warn("\"" + port + "\" is not int, reset it to default port number")
862       port = 8091
863
864    username = get_username(opts.username)
865    password = get_password(opts.password)
866    if len(pair) > 1:
867        username = username or (pair[0] + ':').split(':')[0]
868        password = password or (pair[0] + ':').split(':')[1]
869
870    return host, port, username, password, p[2]
871
872def rest_request(host, port, user, pswd, path, method='GET', body='', reason='', headers=None):
873    if reason:
874        reason = "; reason: %s" % (reason)
875    logging.debug("rest_request: %s@%s:%s%s%s" % (user, host, port, path, reason))
876    conn = httplib.HTTPConnection(host, port)
877    try:
878        header = rest_headers(user, pswd, headers)
879        conn.request(method, path, body, header)
880        resp = conn.getresponse()
881    except Exception, e:
882        return ("error: could not access REST API: %s:%s%s" +
883                "; please check source URL, server status, username (-u) and password (-p)" +
884                "; exception: %s%s") % \
885                (host, port, path, e, reason), None, None
886
887    if resp.status in [200, 201, 202, 204, 302]:
888        return None, conn, resp.read()
889
890    conn.close()
891    if resp.status == 401:
892        return ("error: unable to access REST API: %s:%s%s" +
893                "; please check source URL, server status, username (-u) and password (-p)%s") % \
894                (host, port, path, reason), None, None
895
896    return ("error: unable to access REST API: %s:%s%s" +
897            "; please check source URL, server status, username (-u) and password (-p)" +
898            "; response: %s%s") % \
899            (host, port, path, resp.status, reason), None, None
900
901def rest_headers(user, pswd, headers=None):
902    if not headers:
903        headers = {'Content-Type': 'application/json'}
904    if user:
905        auth = 'Basic ' + \
906            string.strip(base64.encodestring(user + ':' + (pswd or '')))
907        headers['Authorization'] = auth
908    return headers
909
910def rest_request_json(host, port, user, pswd, path, reason=''):
911    err, conn, rest_json = rest_request(host, port, user, pswd, path,
912                                        reason=reason)
913    if err:
914        return err, None, None
915    if conn:
916        conn.close()
917    try:
918        return None, rest_json, json.loads(rest_json)
919    except ValueError, e:
920        return ("error: could not decode JSON from REST API: %s:%s%s" +
921                "; exception: %s" +
922                "; please check URL, username (-u) and password (-p)") % \
923                (host, port, path, e), None, None
924
925def rest_couchbase(opts, spec):
926    spec = spec.replace('couchbase://', 'http://')
927
928    spec_parts = parse_spec(opts, spec, 8091)
929    host, port, user, pswd, path = spec_parts
930
931    if not path or path == '/':
932        path = '/pools/default/buckets'
933
934    if int(port) in [11210, 11211]:
935        return "error: invalid port number %s, which is reserved for moxi service" % port, None
936
937    err, rest_json, rest_data = \
938        rest_request_json(host, int(port), user, pswd, path)
939    if err:
940        return err, None
941
942    if type(rest_data) == type([]):
943        rest_buckets = rest_data
944    else:
945        rest_buckets = [ rest_data ] # Wrap single bucket in a list.
946
947    buckets = []
948
949    for bucket in rest_buckets:
950        if not (bucket and
951                bucket.get('name', None) and
952                bucket.get('bucketType', None) and
953                bucket.get('nodes', None) and
954                bucket.get('nodeLocator', None)):
955            return "error: unexpected JSON value from: " + spec + \
956                " - did you provide the right source URL?", None
957
958        if bucket['nodeLocator'] == 'vbucket' and \
959                (bucket['bucketType'] == 'membase' or
960                 bucket['bucketType'] == 'couchbase'):
961            buckets.append(bucket)
962        else:
963            logging.warn("skipping bucket that is not a couchbase-bucket: " +
964                         bucket['name'])
965
966    if user is None or pswd is None:
967        # Check if we have buckets other than the default one
968        if len(rest_buckets) > 0:
969            if len(rest_buckets) > 1 or rest_buckets[0].get('name', None) != "default":
970                return "error: REST username (-u) and password (-p) are required " + \
971                       "to access all bucket(s)", None
972    return 0, {'spec': spec, 'buckets': buckets, 'spec_parts': spec_parts}
973
974def filter_bucket_nodes(bucket, spec_parts):
975    host, port = spec_parts[:2]
976    if host in ['localhost', '127.0.0.1']:
977        host = get_ip()
978    host_port = host + ':' + str(port)
979    return filter(lambda n: n.get('hostname') == host_port,
980                  bucket['nodes'])
981
982def get_ip():
983    ip = None
984    for fname in ['/opt/couchbase/var/lib/couchbase/ip_start',
985                  '/opt/couchbase/var/lib/couchbase/ip',
986                  '../var/lib/couchbase/ip_start',
987                  '../var/lib/couchbase/ip']:
988        try:
989            f = open(fname, 'r')
990            ip = string.strip(f.read())
991            f.close()
992            if ip and len(ip):
993                if ip.find('@'):
994                    ip = ip.split('@')[1]
995                break
996        except:
997            pass
998    if not ip or not len(ip):
999        ip = '127.0.0.1'
1000    return ip
1001
1002def find_source_bucket_name(opts, source_map):
1003    """If the caller didn't specify a bucket_source and
1004       there's only one bucket in the source_map, use that."""
1005    source_bucket = getattr(opts, "bucket_source", None)
1006    if (not source_bucket and
1007        source_map and
1008        source_map['buckets'] and
1009        len(source_map['buckets']) == 1):
1010        source_bucket = source_map['buckets'][0]['name']
1011    if not source_bucket:
1012        return "error: please specify a bucket_source", None
1013    logging.debug("source_bucket: " + source_bucket)
1014    return 0, source_bucket
1015
1016def find_sink_bucket_name(opts, source_bucket):
1017    """Default bucket_destination to the same as bucket_source."""
1018    sink_bucket = getattr(opts, "bucket_destination", None) or source_bucket
1019    if not sink_bucket:
1020        return "error: please specify a bucket_destination", None
1021    logging.debug("sink_bucket: " + sink_bucket)
1022    return 0, sink_bucket
1023
1024def mkdirs(targetpath):
1025    upperdirs = os.path.dirname(targetpath)
1026    if upperdirs and not os.path.exists(upperdirs):
1027        try:
1028            os.makedirs(upperdirs)
1029        except:
1030            return "Cannot create upper directories for file:%s" % targetpath
1031    return 0
1032