1#!/usr/bin/env python
2
3import copy
4import logging
5import optparse
6import os
7import random
8import string
9import sys
10import threading
11
12import pump
13import pump_bfd
14import pump_csv
15import pump_cb
16import pump_gen
17import pump_mbf
18import pump_mc
19import pump_tap
20import pump_dcp
21
22from pump import PumpingStation
23
24import_stmts = (
25    'from pysqlite2 import dbapi2 as sqlite3',
26    'import sqlite3',
27)
28for status, stmt in enumerate(import_stmts):
29    try:
30        exec stmt
31        break
32    except ImportError:
33        status = None
34if status is None:
35    sys.exit("Error: could not import sqlite3 module")
36
37def exit_handler(err):
38    if err:
39        sys.stderr.write(str(err) + "\n")
40        os._exit(1)
41    else:
42        os._exit(0)
43
44class Transfer:
45    """Base class for 2.0 Backup/Restore/Transfer."""
46
47    def __init__(self):
48        self.name = "cbtransfer"
49        self.source_alias = "source"
50        self.sink_alias = "destination"
51        self.usage = \
52            "%prog [options] source destination\n\n" \
53            "Transfer couchbase cluster data from source to destination.\n\n" \
54            "Examples:\n" \
55            "  %prog http://SOURCE:8091 /backups/backup-42\n" \
56            "  %prog /backups/backup-42 http://DEST:8091\n" \
57            "  %prog /backups/backup-42 couchbase://DEST:8091\n" \
58            "  %prog http://SOURCE:8091 http://DEST:8091\n" \
59            "  %prog couchstore-files:///opt/couchbase/var/lib/couchbase/data/ /backup-XXX\n" \
60            "  %prog couchstore-files:///opt/couchbase/var/lib/couchbase/data/ couchbase://DEST:8091\n"
61
62
63    def main(self, argv, opts_etc=None):
64        if threading.currentThread().getName() == "MainThread":
65            threading.currentThread().setName("mt")
66
67        err, opts, source, sink = self.opt_parse(argv)
68        if err:
69            return err
70
71        if opts_etc:
72            opts.etc = opts_etc # Used for unit tests, etc.
73
74        process_name = os.path.basename(argv[0]) + "-" + "".join(random.sample(string.letters, 16))
75        setattr(opts, "process_name", process_name)
76
77        logging.info(self.name + "...")
78        logging.info(" source : %s", source)
79        logging.info(" sink   : %s", sink)
80        logging.info(" opts   : %s", opts.safe)
81
82        source_class, sink_class = self.find_handlers(opts, source, sink)
83        if not source_class:
84            return "error: unknown type of source: " + source
85        if not sink_class:
86            return "error: unknown type of sink: " + sink
87        err = sink_class.check_source(opts, source_class, source, sink_class, sink)
88        if err:
89            return err
90
91        try:
92            pumpStation = pump.PumpingStation(opts, source_class, source,
93                                              sink_class, sink)
94            rv = pumpStation.run()
95            self.aggregate_stats(pumpStation.cur)
96            return rv
97        except KeyboardInterrupt:
98            return "interrupted."
99
100    def aggregate_stats(self, cur):
101        return 0
102
103    def check_opts(self, opts):
104        return None
105
106    def opt_parse(self, argv):
107        p = self.opt_parser()
108        opts, rest = p.parse_args(argv[1:])
109        if len(rest) != 2:
110            p.print_help()
111            return "\nError: please provide both a %s and a %s" % \
112                (self.source_alias, self.sink_alias), \
113                None, None, None
114
115        err = self.check_opts(opts)
116        if err:
117            return err, None, None, None
118
119        opts.extra = opt_parse_extra(opts.extra, self.opt_extra_defaults())
120        opts.safe = opt_parse_helper(opts)
121
122        return None, opts, rest[0], rest[1]
123
124    def opt_parser(self):
125        p = optparse.OptionParser(usage=self.usage)
126        opt_extra_help(p, self.opt_extra_defaults())
127
128        self.opt_parser_options(p)
129        return p
130
131    def opt_parser_options(self, p):
132        p.add_option("-b", "--bucket-source",
133                     action="store", type="string", default=None,
134                     help="""Single named bucket from source cluster to transfer""")
135        p.add_option("-B", "--bucket-destination",
136                     action="store", type="string", default=None,
137                     help="""Single named bucket on destination cluster which receives transfer.
138This allows you to transfer to a bucket with a different name
139as your source bucket. If you do not provide defaults to the
140same name as the bucket-source""")
141        self.opt_parser_options_common(p)
142        p.add_option("", "--single-node",
143                     action="store_true", default=False,
144                     help="""Transfer from a single server node in a source cluster,
145This single server node is a source node URL""")
146        p.add_option("", "--source-vbucket-state",
147                     action="store", type="string", default='active',
148                     help="""Only transfer from source vbuckets in this state,
149such as 'active' (default) or 'replica'.
150Must be used with Couchbase cluster as source""")
151        p.add_option("", "--destination-vbucket-state",
152                     action="store", type="string", default='active',
153                     help="""Only transfer to destination vbuckets in this state,
154such as 'active' (default) or 'replica'.
155Must be used with Couchbase cluster as source""")
156        p.add_option("", "--destination-operation",
157                     action="store", type="string", default=None,
158                     help="""Perform this operation on transfer.
159'set' will override an existing document,
160'add' will not override, 'get' will load all keys transferred
161from a source cluster into the caching layer at the destination""")
162
163    def opt_parser_options_common(self, p):
164        p.add_option("-i", "--id",
165                     action="store", type="int", default=None,
166                     help="""Transfer only items that match a vbucketID""")
167        p.add_option("-k", "--key",
168                     action="store", type="string", default=None,
169                     help="""Transfer only items with keys that match a regexp""")
170        p.add_option("", "--vbucket-list",
171                     action="store", type="string", default=None,
172                     help=optparse.SUPPRESS_HELP)
173        p.add_option("-n", "--dry-run",
174                     action="store_true", default=False,
175                     help="""No actual transfer; just validate parameters, files,
176                             connectivity and configurations""")
177        p.add_option("-u", "--username",
178                     action="store", type="string", default=None,
179                     help="REST username for source cluster or server node")
180        p.add_option("-p", "--password",
181                     action="store", type="string", default=None,
182                     help="REST password for source cluster or server node")
183        p.add_option("-t", "--threads",
184                     action="store", type="int", default=4,
185                     help="""Number of concurrent workers threads performing the transfer""")
186        p.add_option("-v", "--verbose",
187                     action="count", default=0,
188                     help="verbose logging; more -v's provide more verbosity. Max is -vvv")
189        p.add_option("-x", "--extra",
190                     action="store", type="string", default=None,
191                     help="""Provide extra, uncommon config parameters;
192                             comma-separated key=val(,key=val)* pairs""")
193
194    def opt_extra_defaults(self):
195        return {
196            "batch_max_size":  (1000,   "Transfer this # of documents per batch"),
197            "batch_max_bytes": (400000, "Transfer this # of bytes per batch"),
198            "cbb_max_mb":      (100000, "Split backup file on destination cluster if it exceeds MB"),
199            "max_retry":       (10,     "Max number of sequential retries if transfer fails"),
200            "report":          (5,      "Number batches transferred before updating progress bar in console"),
201            "report_full":     (2000,   "Number batches transferred before emitting progress information in console"),
202            "recv_min_bytes":  (4096,   "Amount of bytes for every TCP/IP call transferred"),
203            "try_xwm":         (1,      "Transfer documents with metadata. 0 should only be used if you transfer from 1.8.x to 1.8.x"),
204            "nmv_retry":       (1,      "0 or 1, where 1 retries transfer after a NOT_MY_VBUCKET message"),
205            "rehash":          (0,      "For value 1, rehash the partition id's of each item; \
206this is needed when transferring data between clusters with different number of partitions, \
207such as when transferring data from an OSX server to a non-OSX cluster"),
208            "data_only":       (0,      "For value 1, only transfer data from a backup file or cluster"),
209            "design_doc_only": (0,      "For value 1, transfer design documents only from a backup file or cluster"),
210            "conflict_resolve":(1,      "By default, disable conflict resolution."),
211            "seqno":           (0,      "By default, start seqno from beginning."),
212            "mcd_compatible":  (1,      "For value 0, display extended fields for stdout output."),
213            "uncompress":      (0,      "For value 1, restore data in uncompressed mode"),
214            "backoff_cap":     (10,     "Max backoff time during rebalance period"),
215            "flow_control":    (1,      "For value 0, disable flow control to improve throughput"),
216            }
217
218    def find_handlers(self, opts, source, sink):
219        return (PumpingStation.find_handler(opts, source, SOURCES),
220                PumpingStation.find_handler(opts, sink, SINKS))
221
222
223class Backup(Transfer):
224    """Entry point for 2.0 cbbackup."""
225
226    def __init__(self):
227        self.name = "cbbackup"
228        self.source_alias = "source"
229        self.sink_alias = "backup_dir"
230        self.usage = \
231            "%prog [options] source backup_dir\n\n" \
232            "Online backup of a couchbase cluster or server node.\n\n" \
233            "Examples:\n" \
234            "   These are a full backup plus two incremental backups for a cluster. \n" \
235            "       %prog http://HOST:8091 /backup-42\n" \
236            "       %prog http://HOST:8091 /backup-42\n" \
237            "       %prog http://HOST:8091 /backup-42\n\n" \
238            "   These are a full backup plus two differentials and one accumulative for a single node. \n" \
239            "       %prog couchbase://HOST:8091 /backup-43 [-m full] --single-node\n" \
240            "       %prog couchbase://HOST:8091 /backup-43 [-m diff] --single-node\n" \
241            "       %prog couchbase://HOST:8091 /backup-43 [-m diff] --single-node\n" \
242            "       %prog couchbase://HOST:8091 /backup-43 -m accu --single-node\n\n" \
243            "Note: A full backup task is always triggered for a new sink location\n" \
244            "   no matter what backup mode is specified.\n"
245
246    def opt_parser_options(self, p):
247        p.add_option("-b", "--bucket-source",
248                     action="store", type="string", default=None,
249                     help="""single bucket from source to backup""")
250        p.add_option("", "--single-node",
251                     action="store_true", default=False,
252                     help="""use a single server node from the source only,
253                             not all server nodes from the entire cluster;
254                             this single server node is defined by the source URL""")
255        try:
256            import pump_bfd2
257            p.add_option("-m", "--mode",
258                        action="store", type="string", default="diff",
259                        help="backup mode: full, diff or accu [default:%default]")
260        except ImportError:
261            p.add_option("-m", "--mode",
262                        action="store", type="string", default="full",
263                        help="backup mode: full")
264
265        Transfer.opt_parser_options_common(self, p)
266
267    def find_handlers(self, opts, source, sink):
268        return PumpingStation.find_handler(opts, source, SOURCES), \
269               PumpingStation.find_handler(opts, sink, SINKS)
270
271    def check_opts(self, opts):
272        mode = getattr(opts, "mode", None)
273        if mode:
274            if mode not in ["full", "diff", "accu"]:
275                return "\nError: option mode has to be 'full', 'diff' or 'accu'"
276        return None
277
278class Restore(Transfer):
279    """Entry point for 2.0 cbrestore."""
280
281    # TODO: (1) Restore - opt_parse handle 1.8 backwards compatible args.
282
283    def __init__(self):
284        self.name = "cbrestore"
285        self.source_alias = "backup_dir"
286        self.sink_alias = "destination"
287        self.usage = \
288            "%prog [options] backup_dir destination\n\n" \
289            "Restores a single couchbase bucket.\n\n" \
290            "Please first create the destination / bucket before restoring.\n\n" \
291            "Examples:\n" \
292            "  %prog /backups/backup-42 http://HOST:8091 \\\n" \
293            "    --bucket-source=default --from-date=2014-01-20 --to-date=2014-03-31\n" \
294            "  %prog /backups/backup-42 couchbase://HOST:8091 \\\n" \
295            "    --bucket-source=default\n" \
296            "  %prog /backups/backup-42 memcached://HOST:11211 \\\n" \
297            "    --bucket-source=sessions --bucket-destination=sessions2"
298
299    def opt_parser_options(self, p):
300        p.add_option("-a", "--add",
301                     action="store_true", default=False,
302                     help="""use add instead of set to not overwrite existing
303                             items in the destination""")
304        p.add_option("-b", "--bucket-source",
305                     action="store", type="string", default=None,
306                     help="""single bucket from the backup_dir to restore;
307                             if the backup_dir only contains a single bucket,
308                             then that bucket will be automatically used""")
309        p.add_option("-B", "--bucket-destination",
310                     action="store", type="string", default=None,
311                     help="""when --bucket-source is specified, overrides the
312                             destination bucket name; this allows you to restore
313                             to a different bucket; defaults to the same as the
314                             bucket-source""")
315        p.add_option("", "--from-date",
316                    action="store", type="string", default=None,
317                    help="""restore data from the date specified as yyyy-mm-dd. By default,
318all data from the very beginning will be restored""")
319        p.add_option("", "--to-date",
320                    action="store", type="string", default=None,
321                    help="""restore data till the date specified as yyyy-mm-dd. By default,
322all data that are collected will be restored""")
323        Transfer.opt_parser_options_common(self, p)
324
325        # TODO: (1) cbrestore parameter --create-design-docs=y|n
326        # TODO: (1) cbrestore parameter -d DATA, --data=DATA
327        # TODO: (1) cbrestore parameter --validate-only
328        # TODO: (1) cbrestore parameter -H HOST, --host=HOST
329        # TODO: (1) cbrestore parameter -p PORT, --port=PORT
330        # TODO: (1) cbrestore parameter option to override expiration?
331
332    def find_handlers(self, opts, source, sink):
333        return pump_bfd.BFDSource, PumpingStation.find_handler(opts, sink, SINKS)
334
335
336# --------------------------------------------------
337
338def opt_parse_helper(opts):
339    logging_level = logging.WARN
340    if opts.verbose >= 1:
341        logging_level = logging.INFO
342    if opts.verbose >= 2:
343        logging_level = logging.DEBUG
344    logging.basicConfig(format=pump.LOGGING_FORMAT, level=logging_level)
345
346    opts_x = copy.deepcopy(opts)
347    if opts_x.username:
348        opts_x.username = "<xxx>"
349    if opts_x.password:
350        opts_x.password = "<xxx>"
351    return opts_x
352
353def opt_parse_extra(extra, extra_defaults):
354    """Convert an extra string (comma-separated key=val pairs) into
355       a dict, using default values from extra_defaults dict."""
356    extra_in = dict([(x[0], x[1]) for x in
357                     [(kv + '=').split('=') for kv in
358                      (extra or "").split(',')]])
359    for k, v in extra_in.iteritems():
360        if k and not extra_defaults.get(k):
361            sys.exit("error: unknown extra option: " + k)
362    return dict([(k, float(extra_in.get(k, extra_defaults[k][0])))
363                 for k in extra_defaults.iterkeys()])
364
365def opt_extra_help(parser, extra_defaults):
366    extra_help = "; ".join(["%s=%s (%s)" %
367                           (k, extra_defaults[k][0], extra_defaults[k][1])
368                           for k in sorted(extra_defaults.iterkeys())])
369
370    group = optparse.OptionGroup(parser, "Available extra config parameters (-x)",
371                        extra_help)
372    parser.add_option_group(group)
373
374# --------------------------------------------------
375
376SOURCES = [pump_bfd.BFDSource,
377           pump_csv.CSVSource,
378           pump_gen.GenSource,
379           pump_mbf.MBFSource,
380           pump_dcp.DCPStreamSource,
381           pump_tap.TAPDumpSource,
382           pump.StdInSource]
383
384SINKS = [pump_bfd.BFDSink,
385         pump_mc.MCSink,
386         pump_cb.CBSink,
387         pump_csv.CSVSink,
388         pump.StdOutSink]
389
390try:
391    import pump_sfd
392    SOURCES.append(pump_sfd.SFDSource)
393    SINKS.append(pump_sfd.SFDSink)
394except ImportError:
395    pass
396
397try:
398    import pump_bson
399    SOURCES.append(pump_bson.BSONSource)
400except ImportError:
401    pass
402
403try:
404    import pump_json
405    SOURCES.append(pump_json.JSONSource)
406except ImportError:
407    pass
408
409try:
410    import pump_bfd2
411    SINKS.insert(0, pump_bfd2.BFDSinkEx)
412except ImportError:
413    pass
414
415# TODO: (1) pump_transfer - use QUIET commands
416# TODO: (1) pump_transfer - verify that nth replica got the msg
417# TODO: (1) pump_transfer - ability to TAP a non-active or replica vbucket / MB-4583
418# TODO: (10) pump_transfer - incremental backup/restore
419
420if __name__ == '__main__':
421    sys.exit(Transfer().main(sys.argv))
422
423