xref: /6.0.3/couchbase-cli/cbbackupwrapper (revision aa8fe140)
1#!/usr/bin/env python
2# -*-python-*-
3
4import pump_transfer
5import pump
6
7import base64
8import optparse
9import os
10import platform
11import Queue
12import re
13import json
14import subprocess
15import sys
16import threading
17import time
18import urllib2
19
20from cluster_manager import ClusterManager
21
22"""Written by Daniel Owen owend@couchbase.com on 27 June 2014
23Version 1.4    Last updated 10 July 2014
24
25The current implementation of cbbackup that comes with Couchbase Server 2.5.1
26uses only one thead per node.  Therefore when using cbbackup with the single-node
27parameter we are limited to one thread - this impacts performance.
28
29This script provides a wrapper to invoke multiple cbbackup processes.
30It automatically detects which buckets and vbuckets are
31on the node.  It allow the user to specify how many vbuckets to backup in a single
32cbbackup process and then invokes the necessary number of processes.
33An example invocation is as follows:
34
35python cbbackupwrapper.py http://127.0.0.1:8091 ../backup/ --single-node -n 4 \
36-u Administrator -p myPassword --path /opt/couchbbase/bin/  -v
37
38This will backup all the buckets on node 127.0.0.1 into ../backup
39It will backup 4 vbuckets per cbbackup process
40Access to the cluster is authenticated using username=Administrator and
41password=myPassword.and cbbackup will be found in /opt/couchbase/bin
42
43Run python cbbackupwrapper -h for more information.
44
45See the cbrestorewrapper.py script for restoring backups made with this script."""
46
47bucketList = []
48vbucketList = []
49backup_complete = False
50process_queue = Queue.Queue()
51lock = threading.Lock()
52
53def _exit_if_errors(errors):
54    if errors:
55        for error in errors:
56            print "ERROR: " + error
57        sys.exit(1)
58
59def opt_extra_help(parser, extra_defaults):
60    extra_help = "; ".join(["%s=%s (%s)" %
61                           (k, extra_defaults[k][0], extra_defaults[k][1])
62                           for k in sorted(extra_defaults.iterkeys())])
63
64    group = optparse.OptionGroup(parser, "Available extra config parameters (-x)",
65                        extra_help)
66    parser.add_option_group(group)
67
68def opt_extra_defaults():
69    return {
70        "batch_max_size":  (1000,   "Transfer this # of documents per batch"),
71        "batch_max_bytes": (400000, "Transfer this # of bytes per batch"),
72        "cbb_max_mb":      (100000, "Split backup file on destination cluster if it exceeds MB"),
73        "max_retry":       (10,     "Max number of sequential retries if transfer fails"),
74        "report":          (5,      "Number batches transferred before updating progress bar in console"),
75        "report_full":     (2000,   "Number batches transferred before emitting progress information in console"),
76        "recv_min_bytes":  (4096,   "Amount of bytes for every TCP/IP call transferred"),
77        "rehash":          (0,      "For value 1, rehash the partition id's of each item; \
78this is needed when transferring data between clusters with different number of partitions, \
79such as when transferring data from an OSX server to a non-OSX cluster"),
80        "conflict_resolve":(1,      "By default, enable conflict resolution."),
81        "data_only":       (0,      "For value 1, only transfer data from a backup file or cluster"),
82        "design_doc_only": (0,      "For value 1, transfer design documents only from a backup file or cluster"),
83        "seqno":           (0,      "By default, start seqno from beginning."),
84        "uncompress":      (0,      "For value 1, restore data in uncompressed mode"),
85        "backoff_cap":     (10,     "Max backoff time during rebalance period"),
86        "flow_control":    (1,      "For value 0, disable flow control to improve throughput"),
87        "dcp_consumer_queue_length": (1000,"A DCP client needs a queue for incoming documents/messages. A large length is more efficient, but memory proportional to length*avg. doc size. Below length 150, performance degrades significantly."),
88    }
89
90def opt_parse_extra(extra, extra_defaults):
91    """Convert an extra string (comma-separated key=val pairs) into
92       a dict, using default values from extra_defaults dict."""
93    extra_in = dict([(x[0], x[1]) for x in
94                     [(kv + '=').split('=') for kv in
95                      (extra or "").split(',')]])
96    for k, v in extra_in.iteritems():
97        if k and not extra_defaults.get(k):
98            sys.exit("error: unknown extra option: " + k)
99    return dict([(k, float(extra_in.get(k, extra_defaults[k][0])))
100                 for k in extra_defaults.iterkeys()])
101
102def argumentParsing():
103    usage = "usage: %prog CLUSTER BACKUPDIR OPTIONS"
104    parser = optparse.OptionParser(usage)
105    opt_extra_help(parser, opt_extra_defaults())
106
107    parser.add_option('-b', '--bucket-source', default='',
108                        help='Specify the bucket to backup.  Defaults to all buckets')
109    parser.add_option('--single-node', action='store_true',
110                        default=False, help='use a single server node from the source only')
111    parser.add_option('-u', '--username', default='Administrator',
112                        help='REST username for source cluster or server node. Default is Administrator')
113    parser.add_option('-p', '--password', default='PASSWORD',
114                        help='REST password for source cluster or server node. Defaults to PASSWORD')
115    parser.add_option("-s", "--ssl",
116                     action="store_true", default=False,
117                     help="Transfer data with SSL enabled")
118    parser.add_option('-v', '--verbose', action='store_true',
119                        default=False, help='Enable verbose messaging')
120    parser.add_option('--path', default='.',
121                        help='Specify the path to cbbackup. Defaults to current directory')
122    parser.add_option('--port', default='11210',
123                        help='Specify the bucket port.  Defaults to 11210')
124    parser.add_option('-n', '--number', default='100',
125                        help='Specify the number of vbuckets per process. Defaults to 100')
126    parser.add_option('-P', '--parallelism', default='1',
127                        help='Number of vbucket backup jobs to run at a time. Defaults to 1')
128    parser.add_option('-x', '--extra', default=None,
129                        help="""Provide extra, uncommon config parameters;
130                        comma-separated key=val(,key=val)* pairs""")
131    try:
132        import pump_bfd2
133        parser.add_option("-m", "--mode",
134                        action="store", type="string", default="diff",
135                        help="backup mode: full, diff or accu [default:%default]")
136    except ImportError:
137        parser.add_option("-m", "--mode",
138                        action="store", type="string", default="full",
139                        help=optparse.SUPPRESS_HELP)
140
141    options, rest = parser.parse_args()
142    if len(rest) != 2:
143        parser.print_help()
144        sys.exit("\nError: please provide both cluster IP and backup directory path.")
145
146    opt_parse_extra(options.extra, opt_extra_defaults())
147
148    return options, rest[0], rest[1]
149
150def findAllVbucketsForBucket(node, bucket, restport, username, password, single_node):
151    cluster = "http://" + node + ":" + restport
152    rest = ClusterManager(cluster, username, password, False, False, None, False)
153
154    result, errors = rest.get_bucket(bucket)
155    _exit_if_errors(errors)
156
157    if not single_node:
158        return range(len(result['vBucketServerMap']['vBucketMap']))
159    else:
160        thisNode = None
161        for node in result["nodes"]:
162            if "thisNode" in node and node["thisNode"]:
163                thisNode = node["hostname"].split(":")[0]
164                break
165
166        if thisNode == None:
167            _exit_if_errors(["Unable to find vbuckets for %s, could not locate thisNode" % cluster])
168
169        serverIdx = None
170        serverList = result['vBucketServerMap']['serverList']
171        for index in range(len(serverList)):
172            if serverList[index].split(":")[0] == thisNode:
173                serverIdx = index
174                break
175
176        if serverIdx == None:
177            _exit_if_errors(["Unable to find vbuckets for %s, thisNode not in serverList" % cluster])
178
179        vbucketList = []
180        vbucketMap = result['vBucketServerMap']['vBucketMap']
181        for vbid in range(len(vbucketMap)):
182            if vbucketMap[vbid][0] == serverIdx:
183                vbucketList.append(vbid)
184
185        return vbucketList
186
187# Get the buckets that exist on the cluster
188def getBuckets(node, rest_port, username, password):
189    request = urllib2.Request(
190        'http://' + node + ':' + rest_port + '/pools/default/buckets')
191    base64string = base64.encodestring(
192        '%s:%s' % (username, password)).replace('\n', '')
193    request.add_header('Authorization', 'Basic %s' % base64string)
194    try:
195        response = urllib2.urlopen(request)
196    except:
197        print('Authorization failed.  Please check username and password.')
198        sys.exit(1)
199    bucketsOnCluster = []
200    data = json.loads(response.read())
201    for item in data:
202        if item['bucketType'] == 'memcached':
203            print('skipping bucket that is not a couchbase-bucket: {0}'.format(item['name']))
204        else:
205            bucket = item['name']
206            bucketsOnCluster.append(bucket)
207    return bucketsOnCluster
208
209def consumer(id, verbose):
210    while True:
211        try:
212            if backup_complete:
213                return
214
215            commandline = process_queue.get(block=True, timeout=2)
216            if verbose:
217                with lock:
218                    print "T(%s): %s" %(id, commandline)
219            p = subprocess.Popen(commandline, shell=True)
220            p.wait()
221            if p.returncode == 1:
222                with lock:
223                    print 'Error with backup for running %s' % commandline
224            process_queue.task_done()
225            time.sleep(1)
226        except Exception:
227            if process_queue.empty():
228                return
229            print 'Exception ' + str(e)
230
231if __name__ == '__main__':
232    # Parse the arguments given.
233    args, cluster, backupDir = argumentParsing()
234
235    backup_exe = 'cbbackup'
236    if platform.system() == "Windows":
237        backup_exe = 'cbbackup.exe'
238
239    # Remove any white-spaces from start and end of strings
240    backupDir = backupDir.strip()
241    path = args.path.strip()
242    if path == ".":
243        path = os.path.abspath(path)
244    if backupDir == ".":
245        backupDir = os.path.abspath(backupDir)
246
247    # Check to see if root backup directory exists
248    if not os.path.isdir(backupDir):
249        try:
250            os.makedirs(backupDir)
251        except:
252            sys.exit("Cannot create backup root directory:%s" % backupDir)
253
254    # Check to see if path is correct
255    if not os.path.isdir(path):
256        print 'The path to cbbackup does not exist'
257        print 'Please run with a different path'
258        sys.exit(1)
259    if not os.path.isfile(os.path.join(path, backup_exe)):
260        print 'cbbackup could not be found in ' + path
261        sys.exit(1)
262
263    # Check to see if log directory exists if not create it
264    dir = os.path.join(backupDir, 'logs')
265    try:
266        os.stat(dir)
267    except:
268        try:
269            os.mkdir(dir)
270        except:
271            print('Error trying to create directory ' + dir)
272            sys.exit(1)
273
274    # Separate out node and REST port
275    matchObj = re.match(r'^http://(.*):(\d+)$', cluster, re.I)
276    if matchObj:
277        node = matchObj.group(1)
278        rest = matchObj.group(2)
279    else:
280        print("Please enter the source as http://hostname:port")
281        print("For example http://localhost:8091 or http://127.0.0.1:8091")
282        sys.exit(1)
283
284    # Check to see if backing-up all buckets or just a specified bucket
285    if args.bucket_source == '':
286        bucketList = getBuckets(
287            node, rest, args.username, args.password)
288    else:
289        # Check that the bucket exists
290        for item in getBuckets(node, rest, args.username, args.password):
291            if item == args.bucket_source:
292                bucketList.append(args.bucket_source)
293
294        if len(bucketList) == 0:
295            print 'Bucket ' + args.bucket_source + ' does not exist'
296            print 'Please enter a different bucket'
297            sys.exit(1)
298
299    # For each bucket
300    for item in bucketList:
301        perbucketvbucketlist = findAllVbucketsForBucket(
302            node, item, rest, args.username, args.password, args.single_node)
303        for item in perbucketvbucketlist:
304            if item not in vbucketList:
305                vbucketList.append(item)
306
307    # If a bucket was specfified then set-up the string to pass to cbbackup.
308    specific_bucket = ''
309    if len(bucketList) == 1:
310        specific_bucket = ' -b ' + bucketList[0]
311
312    extra_options = ''
313    if args.extra:
314        extra_options = ' -x ' + args.extra
315
316    mode_options = ''
317    if args.mode:
318        mode_options = ' -m ' + args.mode
319
320    ssl_option = ''
321    if args.ssl:
322        ssl_option = ' -s '
323
324    worker_threads = []
325    for i in range(int(args.parallelism)):
326        t = threading.Thread(target=consumer, args=(i, args.verbose,))
327        t.daemon = True
328        t.start()
329        worker_threads.append(t)
330
331    # Group the number of vbuckets per process
332    print 'Waiting for the backup to complete...'
333    processes = []
334    for i in range(0, len(vbucketList), int(args.number)):
335        chunk = vbucketList[i:i + int(args.number)]
336        vbucketsname = str(chunk[0]) + '-' + str(chunk[-1])
337        command_line = '"' + os.path.join(path, backup_exe) + '"' + ' -v -t 1 --vbucket-list=' \
338            + ''.join(str(chunk).split()) + ' http://' + node + ':' + rest + ' ' \
339            + '"' + os.path.join(backupDir, vbucketsname) + '"' + ' -u ' + args.username \
340            + ' -p ' + args.password + extra_options + mode_options + ssl_option + specific_bucket \
341            + ' 2> ' + '"' + os.path.join(backupDir, 'logs', vbucketsname) + '.err' + '"'
342        process_queue.put(command_line)
343
344    process_queue.join()
345    backup_complete = True
346
347    for worker in worker_threads:
348        worker.join()
349
350    with lock:
351        print 'SUCCESSFULLY COMPLETED!'
352