1#!/usr/bin/env python
2
3import os
4import base64
5import copy
6import httplib
7import logging
8import re
9import simplejson as json
10import string
11import sys
12import threading
13import time
14import urlparse
15import zlib
16import platform
17import subprocess
18
19import couchbaseConstants
20from cbcollections import defaultdict
21from cbqueue import PumpQueue
22import cbsnappy as snappy
23
24# TODO: (1) optionally log into backup directory
25
26LOGGING_FORMAT = '%(asctime)s: %(threadName)s %(message)s'
27
28NA = 'N/A'
29
30class ProgressReporter(object):
31    """Mixin to report progress"""
32
33    def report_init(self):
34        self.beg_time = time.time()
35        self.prev_time = self.beg_time
36        self.prev = defaultdict(int)
37
38    def report(self, prefix="", emit=None):
39        if not emit:
40            emit = logging.info
41
42        if getattr(self, "source", None):
43            emit(prefix + "source : %s" % (self.source))
44        if getattr(self, "sink", None):
45            emit(prefix + "sink   : %s" % (self.sink))
46
47        cur_time = time.time()
48        delta = cur_time - self.prev_time
49        c, p = self.cur, self.prev
50        x = sorted([k for k in c.iterkeys() if "_sink_" in k])
51
52        width_k = max([5] + [len(k.replace("tot_sink_", "")) for k in x])
53        width_v = max([20] + [len(str(c[k])) for k in x])
54        width_d = max([10] + [len(str(c[k] - p[k])) for k in x])
55        width_s = max([10] + [len("%0.1f" % ((c[k] - p[k]) / delta)) for k in x])
56        emit(prefix + " %s : %s | %s | %s"
57             % (string.ljust("", width_k),
58                string.rjust("total", width_v),
59                string.rjust("last", width_d),
60                string.rjust("per sec", width_s)))
61        verbose_set = ["tot_sink_batch", "tot_sink_msg"]
62        for k in x:
63            if k not in verbose_set or self.opts.verbose > 0:
64                emit(prefix + " %s : %s | %s | %s"
65                 % (string.ljust(k.replace("tot_sink_", ""), width_k),
66                    string.rjust(str(c[k]), width_v),
67                    string.rjust(str(c[k] - p[k]), width_d),
68                    string.rjust("%0.1f" % ((c[k] - p[k]) / delta), width_s)))
69        self.prev_time = cur_time
70        self.prev = copy.copy(c)
71
72    def bar(self, current, total):
73        if not total:
74            return '.'
75        if sys.platform.lower().startswith('win'):
76            cr = "\r"
77        else:
78            cr = chr(27) + "[A\n"
79        pct = float(current) / total
80        max_hash = 20
81        num_hash = int(round(pct * max_hash))
82        return ("  [%s%s] %0.1f%% (%s/estimated %s msgs)%s" %
83                ('#' * num_hash, ' ' * (max_hash - num_hash),
84                 100.0 * pct, current, total, cr))
85
86class PumpingStation(ProgressReporter):
87    """Queues and watchdogs multiple pumps across concurrent workers."""
88
89    def __init__(self, opts, source_class, source_spec, sink_class, sink_spec):
90        self.opts = opts
91        self.source_class = source_class
92        self.source_spec = source_spec
93        self.sink_class = sink_class
94        self.sink_spec = sink_spec
95        self.queue = None
96        tmstamp = time.strftime("%Y-%m-%dT%H%M%SZ", time.gmtime())
97        self.ctl = { 'stop': False,
98                     'rv': 0,
99                     'new_session': True,
100                     'new_timestamp': tmstamp}
101        self.cur = defaultdict(int)
102
103    def run(self):
104        # TODO: (6) PumpingStation - monitor source for topology changes.
105        # TODO: (4) PumpingStation - retry on err N times, M times / server.
106        # TODO: (2) PumpingStation - track checksum in backup, for later restore.
107
108        rv, source_map, sink_map = self.check_endpoints()
109        if rv != 0:
110            return rv
111
112        if self.opts.dry_run:
113            sys.stderr.write("done, but no data written due to dry-run\n")
114            return 0
115
116        source_buckets = self.filter_source_buckets(source_map)
117        if not source_buckets:
118            bucket_source = getattr(self.opts, "bucket_source", None)
119            if bucket_source:
120                return ("error: there is no bucket: %s at source: %s" %
121                        (bucket_source, self.source_spec))
122            else:
123                return ("error: no transferrable buckets at source: %s" %
124                        (self.source_spec))
125
126        for source_bucket in sorted(source_buckets,
127                                    key=lambda b: b['name']):
128            logging.info("bucket: " + source_bucket['name'])
129
130            if not self.opts.extra.get("design_doc_only", 0):
131                rv = self.transfer_bucket_msgs(source_bucket, source_map, sink_map)
132                if rv != 0:
133                    return rv
134            else:
135                sys.stderr.write("transfer design doc only. bucket msgs will be skipped.\n")
136
137            if not self.opts.extra.get("data_only", 0):
138                rv = self.transfer_bucket_design(source_bucket, source_map, sink_map)
139                if rv != 0:
140                    return rv
141            else:
142                sys.stderr.write("transfer data only. bucket design docs will be skipped.\n")
143
144            # TODO: (5) PumpingStation - validate bucket transfers.
145
146        # TODO: (4) PumpingStation - validate source/sink maps were stable.
147
148        sys.stderr.write("done\n")
149        return 0
150
151    def check_endpoints(self):
152        logging.debug("source_class: %s", self.source_class)
153        rv = self.source_class.check_base(self.opts, self.source_spec)
154        if rv != 0:
155            return rv, None, None
156        rv, source_map = self.source_class.check(self.opts, self.source_spec)
157        if rv != 0:
158            return rv, None, None
159
160        logging.debug("sink_class: %s", self.sink_class)
161        rv = self.sink_class.check_base(self.opts, self.sink_spec)
162        if rv != 0:
163            return rv, None, None
164        rv, sink_map = self.sink_class.check(self.opts, self.sink_spec, source_map)
165        if rv != 0:
166            return rv, None, None
167
168        return rv, source_map, sink_map
169
170    def filter_source_buckets(self, source_map):
171        """Filter the source_buckets if a bucket_source was specified."""
172        source_buckets = source_map['buckets']
173        logging.debug("source_buckets: " +
174                      ",".join([n['name'] for n in source_buckets]))
175
176        bucket_source = getattr(self.opts, "bucket_source", None)
177        if bucket_source:
178            logging.debug("bucket_source: " + bucket_source)
179            source_buckets = [b for b in source_buckets
180                              if b['name'] == bucket_source]
181            logging.debug("source_buckets filtered: " +
182                          ",".join([n['name'] for n in source_buckets]))
183        return source_buckets
184
185    def filter_source_nodes(self, source_bucket, source_map):
186        """Filter the source_bucket's nodes if single_node was specified."""
187        if getattr(self.opts, "single_node", None):
188            if not source_map.get('spec_parts'):
189                return ("error: no single_node from source: %s" +
190                        "; the source may not support the --single-node flag") % \
191                        (self.source_spec)
192            source_nodes = filter_bucket_nodes(source_bucket,
193                                               source_map.get('spec_parts'))
194        else:
195            source_nodes = source_bucket['nodes']
196
197        logging.debug(" source_nodes: " + ",".join([n.get('hostname', NA)
198                                                    for n in source_nodes]))
199        return source_nodes
200
201    def transfer_bucket_msgs(self, source_bucket, source_map, sink_map):
202        source_nodes = self.filter_source_nodes(source_bucket, source_map)
203
204        # Transfer bucket msgs with a Pump per source server.
205        self.start_workers(len(source_nodes))
206        self.report_init()
207
208        self.ctl['run_msg'] = 0
209        self.ctl['tot_msg'] = 0
210
211        for source_node in sorted(source_nodes,
212                                  key=lambda n: n.get('hostname', NA)):
213            logging.debug(" enqueueing node: " +
214                          source_node.get('hostname', NA))
215            self.queue.put((source_bucket, source_node, source_map, sink_map))
216
217            rv, tot = self.source_class.total_msgs(self.opts,
218                                                   source_bucket,
219                                                   source_node,
220                                                   source_map)
221            if rv != 0:
222                return rv
223            if tot:
224                self.ctl['tot_msg'] += tot
225
226        # Don't use queue.join() as it eats Ctrl-C's.
227        s = 0.05
228        while self.queue.unfinished_tasks:
229            time.sleep(s)
230            s = min(1.0, s + 0.01)
231
232        rv = self.ctl['rv']
233        if rv != 0:
234            return rv
235
236        time.sleep(0.01) # Allows threads to update counters.
237
238        sys.stderr.write(self.bar(self.ctl['run_msg'],
239                                  self.ctl['tot_msg']) + "\n")
240        sys.stderr.write("bucket: " + source_bucket['name'] +
241                         ", msgs transferred...\n")
242        def emit(msg):
243            sys.stderr.write(msg + "\n")
244        self.report(emit=emit)
245
246        return 0
247
248    def transfer_bucket_design(self, source_bucket, source_map, sink_map):
249        """Transfer bucket design (e.g., design docs, views)."""
250        rv, source_design = \
251            self.source_class.provide_design(self.opts, self.source_spec,
252                                             source_bucket, source_map)
253        if rv == 0:
254            if source_design:
255                rv = self.sink_class.consume_design(self.opts,
256                                                self.sink_spec, sink_map,
257                                                source_bucket, source_map,
258                                                source_design)
259        return rv
260
261    @staticmethod
262    def run_worker(self, thread_index):
263        while not self.ctl['stop']:
264            source_bucket, source_node, source_map, sink_map = \
265                self.queue.get()
266            hostname = source_node.get('hostname', NA)
267            logging.debug(" node: %s" % (hostname))
268
269            curx = defaultdict(int)
270            self.source_class.check_spec(source_bucket,
271                                         source_node,
272                                         self.opts,
273                                         self.source_spec,
274                                         curx)
275            self.sink_class.check_spec(source_bucket,
276                                       source_node,
277                                       self.opts,
278                                       self.sink_spec,
279                                       curx)
280            rv = Pump(self.opts,
281                      self.source_class(self.opts, self.source_spec,
282                                        source_bucket, source_node,
283                                        source_map, sink_map, self.ctl, curx),
284                      self.sink_class(self.opts, self.sink_spec,
285                                      source_bucket, source_node,
286                                      source_map, sink_map, self.ctl, curx),
287                      source_map, sink_map, self.ctl, curx).run()
288
289            for k, v in curx.items():
290                if isinstance(v, int):
291                    self.cur[k] = self.cur.get(k, 0) + v
292
293            logging.debug(" node: %s, done; rv: %s" % (hostname, rv))
294            if self.ctl['rv'] == 0 and rv != 0:
295                self.ctl['rv'] = rv
296
297            self.queue.task_done()
298
299    def start_workers(self, queue_size):
300        if self.queue:
301            return
302
303        self.queue = PumpQueue(queue_size)
304
305        threads = [threading.Thread(target=PumpingStation.run_worker,
306                                    name="w" + str(i), args=(self, i))
307                   for i in range(self.opts.threads)]
308        for thread in threads:
309            thread.daemon = True
310            thread.start()
311
312    @staticmethod
313    def find_handler(opts, x, classes):
314        for s in classes:
315            if s.can_handle(opts, x):
316                return s
317        return None
318
319
320class Pump(ProgressReporter):
321    """Moves batches of data from one Source to one Sink."""
322
323    def __init__(self, opts, source, sink, source_map, sink_map, ctl, cur):
324        self.opts = opts
325        self.source = source
326        self.sink = sink
327        self.source_map = source_map
328        self.sink_map = sink_map
329        self.ctl = ctl
330        self.cur = cur # Should be a defaultdict(int); 0 as default value.
331
332    def run(self):
333        future = None
334
335        # TODO: (2) Pump - timeouts when providing/consuming/waiting.
336
337        report = int(self.opts.extra.get("report", 5))
338        report_full = int(self.opts.extra.get("report_full", 2000))
339
340        self.report_init()
341
342        n = 0
343
344        while not self.ctl['stop']:
345            rv_batch, batch = self.source.provide_batch()
346            if rv_batch != 0:
347                return self.done(rv_batch)
348
349            if future:
350                rv = future.wait_until_consumed()
351                if rv != 0:
352                    # TODO: (5) Pump - retry logic on consume error.
353                    return self.done(rv)
354
355                self.cur['tot_sink_batch'] += 1
356                self.cur['tot_sink_msg'] += future.batch.size()
357                self.cur['tot_sink_byte'] += future.batch.bytes
358
359                self.ctl['run_msg'] += future.batch.size()
360                self.ctl['tot_msg'] += future.batch.adjust_size
361
362            if not batch:
363                return self.done(0)
364
365            self.cur['tot_source_batch'] += 1
366            self.cur['tot_source_msg'] += batch.size()
367            self.cur['tot_source_byte'] += batch.bytes
368
369            rv_future, future = self.sink.consume_batch_async(batch)
370            if rv_future != 0:
371                return self.done(rv_future)
372
373            n = n + 1
374            if report_full > 0 and n % report_full == 0:
375                if self.opts.verbose > 0:
376                    sys.stderr.write("\n")
377                logging.info("  progress...")
378                self.report(prefix="  ")
379            elif report > 0 and n % report == 0:
380                sys.stderr.write(self.bar(self.ctl['run_msg'],
381                                          self.ctl['tot_msg']))
382
383        return self.done(0)
384
385    def done(self, rv):
386        self.source.close()
387        self.sink.close()
388
389        logging.debug("  pump (%s->%s) done.", self.source, self.sink)
390        self.report(prefix="  ")
391
392        if (rv == 0 and
393            (self.cur['tot_source_batch'] != self.cur['tot_sink_batch'] or
394             self.cur['tot_source_msg'] != self.cur['tot_sink_msg'] or
395             self.cur['tot_source_byte'] != self.cur['tot_sink_byte'])):
396            return "error: sink missing some source msgs: " + str(self.cur)
397
398        return rv
399
400
401# --------------------------------------------------
402
403class EndPoint(object):
404
405    def __init__(self, opts, spec, source_bucket, source_node,
406                 source_map, sink_map, ctl, cur):
407        self.opts = opts
408        self.spec = spec
409        self.source_bucket = source_bucket
410        self.source_node = source_node
411        self.source_map = source_map
412        self.sink_map = sink_map
413        self.ctl = ctl
414        self.cur = cur
415
416        self.only_key_re = None
417        k = getattr(opts, "key", None)
418        if k:
419            self.only_key_re = re.compile(k)
420
421        self.only_vbucket_id = getattr(opts, "id", None)
422
423    @staticmethod
424    def check_base(opts, spec):
425        k = getattr(opts, "key", None)
426        if k:
427            try:
428                re.compile(k)
429            except:
430                return "error: could not parse key regexp: " + k
431        return 0
432
433    @staticmethod
434    def check_spec(source_bucket, source_node, opts, spec, cur):
435        cur['seqno'] = {}
436        cur['failoverlog'] = {}
437        cur['snapshot'] = {}
438        return 0
439
440    def __repr__(self):
441        return "%s(%s@%s)" % \
442            (self.spec,
443             self.source_bucket.get('name', ''),
444             self.source_node.get('hostname', ''))
445
446    def close(self):
447        pass
448
449    def skip(self, key, vbucket_id):
450        if (self.only_key_re and not re.search(self.only_key_re, key)):
451            logging.warn("skipping msg with key: " + str(key))
452            return True
453
454        if (self.only_vbucket_id is not None and
455            self.only_vbucket_id != vbucket_id):
456            logging.warn("skipping msg of vbucket_id: " + str(vbucket_id))
457            return True
458
459        return False
460
461    def add_counter(self, key, val=1):
462        self.cur[key] = self.cur.get(key, 0.0) + val
463
464
465class Source(EndPoint):
466    """Base class for all data sources."""
467
468    @staticmethod
469    def can_handle(opts, spec):
470        assert False, "unimplemented"
471
472    @staticmethod
473    def check_base(opts, spec):
474        rv = EndPoint.check_base(opts, spec)
475        if rv != 0:
476            return rv
477        if getattr(opts, "source_vbucket_state", "active") != "active":
478            return ("error: only --source-vbucket-state=active" +
479                    " is supported by this source: %s") % (spec)
480        return 0
481
482    @staticmethod
483    def check(opts, spec):
484        """Subclasses can check preconditions before any pumping starts."""
485        assert False, "unimplemented"
486
487    @staticmethod
488    def provide_design(opts, source_spec, source_bucket, source_map):
489        assert False, "unimplemented"
490
491    def provide_batch(self):
492        assert False, "unimplemented"
493
494    @staticmethod
495    def total_msgs(opts, source_bucket, source_node, source_map):
496        return 0, None # Subclasses can return estimate # msgs.
497
498
499class Sink(EndPoint):
500    """Base class for all data sinks."""
501
502    # TODO: (2) Sink handles filtered restore by data.
503
504    def __init__(self, opts, spec, source_bucket, source_node,
505                 source_map, sink_map, ctl, cur):
506        super(Sink, self).__init__(opts, spec, source_bucket, source_node,
507                                   source_map, sink_map, ctl, cur)
508        self.op = None
509
510    @staticmethod
511    def can_handle(opts, spec):
512        assert False, "unimplemented"
513
514    @staticmethod
515    def check_base(opts, spec):
516        rv = EndPoint.check_base(opts, spec)
517        if rv != 0:
518            return rv
519        if getattr(opts, "destination_vbucket_state", "active") != "active":
520            return ("error: only --destination-vbucket-state=active" +
521                    " is supported by this destination: %s") % (spec)
522        if getattr(opts, "destination_operation", None) != None:
523            return ("error: --destination-operation" +
524                    " is not supported by this destination: %s") % (spec)
525        return 0
526
527    @staticmethod
528    def check(opts, spec, source_map):
529        """Subclasses can check preconditions before any pumping starts."""
530        assert False, "unimplemented"
531
532    @staticmethod
533    def consume_design(opts, sink_spec, sink_map,
534                       source_bucket, source_map, source_design):
535        assert False, "unimplemented"
536
537    def consume_batch_async(self, batch):
538        """Subclasses should return a SinkBatchFuture."""
539        assert False, "unimplemented"
540
541    @staticmethod
542    def check_source(opts, source_class, source_spec, sink_class, sink_spec):
543        if source_spec == sink_spec:
544            return "error: source and sink must be different;" \
545                " source: " + source_spec + \
546                " sink: " + sink_spec
547        return None
548
549    def operation(self):
550        if not self.op:
551            self.op = getattr(self.opts, "destination_operation", None)
552            if not self.op:
553                self.op = "set"
554                if getattr(self.opts, "add", False):
555                    self.op = "add"
556        return self.op
557
558    def init_worker(self, target):
559        self.worker_go = threading.Event()
560        self.worker_work = None # May be None or (batch, future) tuple.
561        self.worker = threading.Thread(target=target, args=(self,),
562                                       name="s" + threading.currentThread().getName()[1:])
563        self.worker.daemon = True
564        self.worker.start()
565
566    def push_next_batch(self, batch, future):
567        """Push batch/future to worker."""
568        if not self.worker.isAlive():
569            return "error: cannot use a dead worker", None
570
571        self.worker_work = (batch, future)
572        self.worker_go.set()
573        return 0, future
574
575    def pull_next_batch(self):
576        """Worker calls this method to get the next batch/future."""
577        self.worker_go.wait()
578        batch, future = self.worker_work
579        self.worker_work = None
580        self.worker_go.clear()
581        return batch, future
582
583    def future_done(self, future, rv):
584        """Worker calls this method to finish a batch/future."""
585        if rv != 0:
586            logging.error("error: async operation: %s on sink: %s" %
587                          (rv, self))
588        if future:
589            future.done_rv = rv
590            future.done.set()
591
592
593# --------------------------------------------------
594
595class Batch(object):
596    """Holds a batch of data being transfered from source to sink."""
597
598    def __init__(self, source):
599        self.source = source
600        self.msgs = []
601        self.bytes = 0
602        self.adjust_size = 0
603
604    def append(self, msg, num_bytes):
605        self.msgs.append(msg)
606        self.bytes = self.bytes + num_bytes
607
608    def size(self):
609        return len(self.msgs)
610
611    def msg(self, i):
612        return self.msgs[i]
613
614    def group_by_vbucket_id(self, vbuckets_num, rehash=0):
615        """Returns dict of vbucket_id->[msgs] grouped by msg's vbucket_id."""
616        g = defaultdict(list)
617        for msg in self.msgs:
618            cmd, vbucket_id, key = msg[:3]
619            if vbucket_id == 0x0000ffff or rehash == 1:
620                # Special case when the source did not supply a vbucket_id
621                # (such as stdin source), so we calculate it.
622                vbucket_id = (zlib.crc32(key) >> 16) & (vbuckets_num - 1)
623                msg = (cmd, vbucket_id) + msg[2:]
624            g[vbucket_id].append(msg)
625        return g
626
627
628class SinkBatchFuture(object):
629    """Future completion of a sink consuming a batch."""
630
631    def __init__(self, sink, batch):
632        self.sink = sink
633        self.batch = batch
634        self.done = threading.Event()
635        self.done_rv = None
636
637    def wait_until_consumed(self):
638        self.done.wait()
639        return self.done_rv
640
641
642# --------------------------------------------------
643
644class StdInSource(Source):
645    """Reads batches from stdin in memcached ascii protocol."""
646
647    def __init__(self, opts, spec, source_bucket, source_node,
648                 source_map, sink_map, ctl, cur):
649        super(StdInSource, self).__init__(opts, spec, source_bucket, source_node,
650                                          source_map, sink_map, ctl, cur)
651        self.f = sys.stdin
652
653    @staticmethod
654    def can_handle(opts, spec):
655        return spec.startswith("stdin:") or spec == "-"
656
657    @staticmethod
658    def check(opts, spec):
659        return 0, {'spec': spec,
660                   'buckets': [{'name': 'stdin:',
661                                'nodes': [{'hostname': 'N/A'}]}] }
662
663    @staticmethod
664    def provide_design(opts, source_spec, source_bucket, source_map):
665        return 0, None
666
667    def provide_batch(self):
668        batch = Batch(self)
669
670        batch_max_size = self.opts.extra['batch_max_size']
671        batch_max_bytes = self.opts.extra['batch_max_bytes']
672
673        vbucket_id = 0x0000ffff
674
675        while (self.f and
676               batch.size() < batch_max_size and
677               batch.bytes < batch_max_bytes):
678            line = self.f.readline()
679            if not line:
680                self.f = None
681                return 0, batch
682
683            parts = line.split(' ')
684            if not parts:
685                return "error: read empty line", None
686            elif parts[0] == 'set' or parts[0] == 'add':
687                if len(parts) != 5:
688                    return "error: length of set/add line: " + line, None
689                cmd = couchbaseConstants.CMD_TAP_MUTATION
690                key = parts[1]
691                flg = int(parts[2])
692                exp = int(parts[3])
693                num = int(parts[4])
694                if num > 0:
695                    val = self.f.read(num)
696                    if len(val) != num:
697                        return "error: value read failed at: " + line, None
698                else:
699                    val = ''
700                end = self.f.read(2) # Read '\r\n'.
701                if len(end) != 2:
702                    return "error: value end read failed at: " + line, None
703
704                if not self.skip(key, vbucket_id):
705                    msg = (cmd, vbucket_id, key, flg, exp, 0, '', val, 0, 0, 0)
706                    batch.append(msg, len(val))
707            elif parts[0] == 'delete':
708                if len(parts) != 2:
709                    return "error: length of delete line: " + line, None
710                cmd = couchbaseConstants.CMD_TAP_DELETE
711                key = parts[1]
712                if not self.skip(key, vbucket_id):
713                    msg = (cmd, vbucket_id, key, 0, 0, 0, '', '', 0, 0, 0)
714                    batch.append(msg, 0)
715            else:
716                return "error: expected set/add/delete but got: " + line, None
717
718        if batch.size() <= 0:
719            return 0, None
720
721        return 0, batch
722
723
724class StdOutSink(Sink):
725    """Emits batches to stdout in memcached ascii protocol."""
726
727    @staticmethod
728    def can_handle(opts, spec):
729        if spec.startswith("stdout:") or spec == "-":
730            opts.threads = 1 # Force 1 thread to not overlap stdout.
731            return True
732        return False
733
734    @staticmethod
735    def check(opts, spec, source_map):
736        return 0, None
737
738    @staticmethod
739    def check_base(opts, spec):
740        if getattr(opts, "destination_vbucket_state", "active") != "active":
741            return ("error: only --destination-vbucket-state=active" +
742                    " is supported by this destination: %s") % (spec)
743
744        op = getattr(opts, "destination_operation", None)
745        if not op in [None, 'set', 'add', 'get']:
746            return ("error: --destination-operation unsupported value: %s" +
747                    "; use set, add, get") % (op)
748
749        # Skip immediate superclass Sink.check_base(),
750        # since StdOutSink can handle different destination operations.
751        return EndPoint.check_base(opts, spec)
752
753    @staticmethod
754    def consume_design(opts, sink_spec, sink_map,
755                       source_bucket, source_map, source_design):
756        if source_design:
757            logging.warn("warning: cannot save bucket design"
758                         " on a stdout destination")
759        return 0
760
761    def consume_batch_async(self, batch):
762        op = self.operation()
763        op_mutate = op in ['set', 'add']
764
765        stdout = sys.stdout
766        msg_visitor = None
767
768        opts_etc = getattr(self.opts, "etc", None)
769        if opts_etc:
770            stdout = opts_etc.get("stdout", sys.stdout)
771            msg_visitor = opts_etc.get("msg_visitor", None)
772
773        mcd_compatible = self.opts.extra.get("mcd_compatible", 1)
774        msg_tuple_format = 0
775        for msg in batch.msgs:
776            if msg_visitor:
777                msg = msg_visitor(msg)
778            if not msg_tuple_format:
779                msg_tuple_format = len(msg)
780            cmd, vbucket_id, key, flg, exp, cas, meta, val = msg[:8]
781            seqno = dtype = nmeta = 0
782            if msg_tuple_format > 8:
783                seqno, dtype, nmeta = msg[8:]
784            if self.skip(key, vbucket_id):
785                continue
786            if dtype > 2:
787                try:
788                    val = snappy.uncompress(val)
789                except Exception, err:
790                    pass
791            try:
792                if cmd == couchbaseConstants.CMD_TAP_MUTATION or \
793                   cmd == couchbaseConstants.CMD_DCP_MUTATION:
794                    if op_mutate:
795                        # <op> <key> <flags> <exptime> <bytes> [noreply]\r\n
796                        if mcd_compatible:
797                            stdout.write("%s %s %s %s %s\r\n" %
798                                         (op, key, flg, exp, len(val)))
799                        else:
800                            stdout.write("%s %s %s %s %s %s %s\r\n" %
801                                         (op, key, flg, exp, len(val), seqno, dtype))
802                        stdout.write(val)
803                        stdout.write("\r\n")
804                    elif op == 'get':
805                        stdout.write("get %s\r\n" % (key))
806                elif cmd == couchbaseConstants.CMD_TAP_DELETE or \
807                     cmd == couchbaseConstants.CMD_DCP_DELETE:
808                    if op_mutate:
809                        stdout.write("delete %s\r\n" % (key))
810                elif cmd == couchbaseConstants.CMD_GET:
811                    stdout.write("get %s\r\n" % (key))
812                else:
813                    return "error: StdOutSink - unknown cmd: " + str(cmd), None
814            except IOError:
815                return "error: could not write to stdout", None
816
817        stdout.flush()
818        future = SinkBatchFuture(self, batch)
819        self.future_done(future, 0)
820        return 0, future
821
822
823# --------------------------------------------------
824
825CMD_STR = {
826    couchbaseConstants.CMD_TAP_CONNECT: "TAP_CONNECT",
827    couchbaseConstants.CMD_TAP_MUTATION: "TAP_MUTATION",
828    couchbaseConstants.CMD_TAP_DELETE: "TAP_DELETE",
829    couchbaseConstants.CMD_TAP_FLUSH: "TAP_FLUSH",
830    couchbaseConstants.CMD_TAP_OPAQUE: "TAP_OPAQUE",
831    couchbaseConstants.CMD_TAP_VBUCKET_SET: "TAP_VBUCKET_SET",
832    couchbaseConstants.CMD_TAP_CHECKPOINT_START: "TAP_CHECKPOINT_START",
833    couchbaseConstants.CMD_TAP_CHECKPOINT_END: "TAP_CHECKPOINT_END",
834    couchbaseConstants.CMD_NOOP: "NOOP"
835}
836
837def get_username(username):
838    return username or os.environ.get('CB_REST_USERNAME', '')
839
840def get_password(password):
841    return password or os.environ.get('CB_REST_PASSWORD', '')
842
843def parse_spec(opts, spec, port):
844    """Parse host, port, username, password, path from opts and spec."""
845
846    # Example spec: http://Administrator:password@HOST:8091
847    p = urlparse.urlparse(spec)
848
849    # Example netloc: Administrator:password@HOST:8091
850    #ParseResult tuple(scheme, netloc, path, params, query, fragment)
851    netloc = p[1]
852
853    if not netloc: # When urlparse() can't parse non-http URI's.
854        netloc = spec.split('://')[-1].split('/')[0]
855
856    pair = netloc.split('@') # [ "user:pwsd", "host:port" ].
857    host = (pair[-1] + ":" + str(port)).split(':')[0]
858    port = (pair[-1] + ":" + str(port)).split(':')[1]
859    try:
860       val = int(port)
861    except ValueError:
862       logging.warn("\"" + port + "\" is not int, reset it to default port number")
863       port = 8091
864
865    username = get_username(opts.username)
866    password = get_password(opts.password)
867    if len(pair) > 1:
868        username = username or (pair[0] + ':').split(':')[0]
869        password = password or (pair[0] + ':').split(':')[1]
870
871    return host, port, username, password, p[2]
872
873def rest_request(host, port, user, pswd, path, method='GET', body='', reason='', headers=None):
874    if reason:
875        reason = "; reason: %s" % (reason)
876    logging.debug("rest_request: %s@%s:%s%s%s" % (user, host, port, path, reason))
877    conn = httplib.HTTPConnection(host, port)
878    try:
879        header = rest_headers(user, pswd, headers)
880        conn.request(method, path, body, header)
881        resp = conn.getresponse()
882    except Exception, e:
883        return ("error: could not access REST API: %s:%s%s" +
884                "; please check source URL, server status, username (-u) and password (-p)" +
885                "; exception: %s%s") % \
886                (host, port, path, e, reason), None, None
887
888    if resp.status in [200, 201, 202, 204, 302]:
889        return None, conn, resp.read()
890
891    conn.close()
892    if resp.status == 401:
893        return ("error: unable to access REST API: %s:%s%s" +
894                "; please check source URL, server status, username (-u) and password (-p)%s") % \
895                (host, port, path, reason), None, None
896
897    return ("error: unable to access REST API: %s:%s%s" +
898            "; please check source URL, server status, username (-u) and password (-p)" +
899            "; response: %s%s") % \
900            (host, port, path, resp.status, reason), None, None
901
902def rest_headers(user, pswd, headers=None):
903    if not headers:
904        headers = {'Content-Type': 'application/json'}
905    if user:
906        auth = 'Basic ' + \
907            string.strip(base64.encodestring(user + ':' + (pswd or '')))
908        headers['Authorization'] = auth
909    return headers
910
911def rest_request_json(host, port, user, pswd, path, reason=''):
912    err, conn, rest_json = rest_request(host, port, user, pswd, path,
913                                        reason=reason)
914    if err:
915        return err, None, None
916    if conn:
917        conn.close()
918    try:
919        return None, rest_json, json.loads(rest_json)
920    except ValueError, e:
921        return ("error: could not decode JSON from REST API: %s:%s%s" +
922                "; exception: %s" +
923                "; please check URL, username (-u) and password (-p)") % \
924                (host, port, path, e), None, None
925
926def rest_couchbase(opts, spec):
927    spec = spec.replace('couchbase://', 'http://')
928
929    spec_parts = parse_spec(opts, spec, 8091)
930    host, port, user, pswd, path = spec_parts
931
932    if not path or path == '/':
933        path = '/pools/default/buckets'
934
935    if int(port) in [11210, 11211]:
936        return "error: invalid port number %s, which is reserved for moxi service" % port, None
937
938    err, rest_json, rest_data = \
939        rest_request_json(host, int(port), user, pswd, path)
940    if err:
941        return err, None
942
943    if type(rest_data) == type([]):
944        rest_buckets = rest_data
945    else:
946        rest_buckets = [ rest_data ] # Wrap single bucket in a list.
947
948    buckets = []
949
950    for bucket in rest_buckets:
951        if not (bucket and
952                bucket.get('name', None) and
953                bucket.get('bucketType', None) and
954                bucket.get('nodes', None) and
955                bucket.get('nodeLocator', None)):
956            return "error: unexpected JSON value from: " + spec + \
957                " - did you provide the right source URL?", None
958
959        if bucket['nodeLocator'] == 'vbucket' and \
960                (bucket['bucketType'] == 'membase' or
961                 bucket['bucketType'] == 'couchbase'):
962            buckets.append(bucket)
963        else:
964            logging.warn("skipping bucket that is not a couchbase-bucket: " +
965                         bucket['name'])
966
967    if user is None or pswd is None:
968        # Check if we have buckets other than the default one
969        if len(rest_buckets) > 0:
970            if len(rest_buckets) > 1 or rest_buckets[0].get('name', None) != "default":
971                return "error: REST username (-u) and password (-p) are required " + \
972                       "to access all bucket(s)", None
973    return 0, {'spec': spec, 'buckets': buckets, 'spec_parts': spec_parts}
974
975def filter_bucket_nodes(bucket, spec_parts):
976    host, port = spec_parts[:2]
977    if host in ['localhost', '127.0.0.1']:
978        host = get_ip()
979    host_port = host + ':' + str(port)
980    return filter(lambda n: n.get('hostname') == host_port,
981                  bucket['nodes'])
982
983def get_ip():
984    ip = None
985    for fname in ['/opt/couchbase/var/lib/couchbase/ip_start',
986                  '/opt/couchbase/var/lib/couchbase/ip',
987                  '../var/lib/couchbase/ip_start',
988                  '../var/lib/couchbase/ip']:
989        try:
990            f = open(fname, 'r')
991            ip = string.strip(f.read())
992            f.close()
993            if ip and len(ip):
994                if ip.find('@'):
995                    ip = ip.split('@')[1]
996                break
997        except:
998            pass
999    if not ip or not len(ip):
1000        ip = '127.0.0.1'
1001    return ip
1002
1003def find_source_bucket_name(opts, source_map):
1004    """If the caller didn't specify a bucket_source and
1005       there's only one bucket in the source_map, use that."""
1006    source_bucket = getattr(opts, "bucket_source", None)
1007    if (not source_bucket and
1008        source_map and
1009        source_map['buckets'] and
1010        len(source_map['buckets']) == 1):
1011        source_bucket = source_map['buckets'][0]['name']
1012    if not source_bucket:
1013        return "error: please specify a bucket_source", None
1014    logging.debug("source_bucket: " + source_bucket)
1015    return 0, source_bucket
1016
1017def find_sink_bucket_name(opts, source_bucket):
1018    """Default bucket_destination to the same as bucket_source."""
1019    sink_bucket = getattr(opts, "bucket_destination", None) or source_bucket
1020    if not sink_bucket:
1021        return "error: please specify a bucket_destination", None
1022    logging.debug("sink_bucket: " + sink_bucket)
1023    return 0, sink_bucket
1024
1025def mkdirs(targetpath):
1026    upperdirs = os.path.dirname(targetpath)
1027    if upperdirs and not os.path.exists(upperdirs):
1028        try:
1029            os.makedirs(upperdirs)
1030        except:
1031            return "Cannot create upper directories for file:%s" % targetpath
1032    return 0
1033