xref: /3.0.3-GA/couchbase-cli/pump_bfd.py (revision 36a52d63)
1#!/usr/bin/env python
2
3import copy
4import glob
5import logging
6import os
7import simplejson as json
8import string
9import sys
10import datetime
11import time
12import urllib
13import fnmatch
14
15import couchbaseConstants
16import pump
17
18import_stmts = (
19    'from pysqlite2 import dbapi2 as sqlite3',
20    'import sqlite3',
21)
22for status, stmt in enumerate(import_stmts):
23    try:
24        exec stmt
25        break
26    except ImportError:
27        status = None
28if status is None:
29    sys.exit("Error: could not import sqlite3 module")
30
31CBB_VERSION = [2004, 2014] # sqlite pragma user version.
32
33class BFD:
34    """Mixin for backup-file/directory EndPoint helper methods."""
35    NUM_VBUCKET = 1024
36
37    def bucket_name(self):
38        return self.source_bucket['name']
39
40    def node_name(self):
41        return self.source_node['hostname']
42
43    @staticmethod
44    def design_path(spec, bucket_name):
45        bucket_path = os.path.normpath(spec) + "/bucket-" + urllib.quote_plus(bucket_name)
46        if os.path.isdir(bucket_path):
47            return bucket_path + '/design.json'
48        else:
49            path, dirs = BFD.find_latest_dir(spec, None)
50            if path:
51                path, dirs = BFD.find_latest_dir(path, None)
52                if path:
53                    return path + "/bucket-" + urllib.quote_plus(bucket_name) + '/design.json'
54        return bucket_path + '/design.json'
55
56    @staticmethod
57    def construct_dir(parent, bucket_name, node_name):
58        return os.path.join(parent,
59                    "bucket-" + urllib.quote_plus(bucket_name),
60                    "node-" + urllib.quote_plus(node_name))
61
62    @staticmethod
63    def check_full_dbfiles(parent_dir):
64        return glob.glob(os.path.join(parent_dir, "data-*.cbb"))
65
66    @staticmethod
67    def get_predecessors(parent_dir):
68        try:
69            json_file = open(os.path.join(parent_dir, "meta.json"), "r")
70            json_data = json.load(json_file)
71            json_file.close()
72            return json_data["pred"]
73        except IOError:
74            return []
75
76    @staticmethod
77    def get_failover_log(parent_dir):
78        filepath = os.path.join(parent_dir, "failover.json")
79        if os.path.isfile(filepath):
80            json_file = open(filepath, "r")
81            json_data = json.load(json_file)
82            json_file.close()
83            return json_data
84        else:
85            return {}
86
87    @staticmethod
88    def list_files(opts, spec, bucket, node, pattern):
89        file_list = []
90        prec_list = []
91
92    @staticmethod
93    def write_json_file(parent_dir, filename, output_data):
94        filepath = os.path.join(parent_dir, filename)
95        json_data = {}
96        if os.path.isfile(filepath):
97            #load into existed meta data generated from previous batch run
98            json_file = open(filepath, "r")
99            json_data = json.load(json_file)
100            json_file.close()
101
102        for i in range(BFD.NUM_VBUCKET):
103            if output_data.get(i):
104                str_index = str(i)
105                historic_data_exist = json_data.get(str_index)
106                if historic_data_exist:
107                    #Historic data will share same data type as incoming ones.
108                    #It will be sufficient to only check type for historic data
109                    if isinstance(json_data[str_index], int):
110                        #For seqno, we want to keep the highest seqno
111                        if json_data[str_index] < output_data[i]:
112                            json_data[str_index] = output_data[i]
113                    elif isinstance(json_data[str_index], list):
114                        #For each vbucket, we want to get the superset of its references
115                        if len(json_data[str_index]) < len(output_data[i]):
116                            json_data[str_index] = output_data[i]
117                else:
118                    #Bookkeeping the incoming one.
119                    json_data[str_index] = output_data[i]
120
121        json_file = open(filepath, "w")
122        json.dump(json_data, json_file, ensure_ascii=False)
123        json_file.close()
124
125    @staticmethod
126    def db_dir(spec, bucket_name, node_name, tmstamp=None, mode=None, new_session=False):
127        parent_dir = os.path.normpath(spec) + \
128                        '/bucket-' + urllib.quote_plus(bucket_name) + \
129                        '/node-' + urllib.quote_plus(node_name)
130        if os.path.isdir(parent_dir):
131            return parent_dir
132
133        #check 3.0 directory structure
134        if not tmstamp:
135            tmstamp = time.strftime("%Y-%m-%dT%H%M%SZ", time.gmtime())
136        parent_dir = os.path.normpath(spec)
137        rootpath, dirs = BFD.find_latest_dir(parent_dir, None)
138        if not rootpath or not mode or mode == "full":
139            # no any backup roots exists
140            path = os.path.join(parent_dir, tmstamp, tmstamp+"-full")
141            return BFD.construct_dir(path, bucket_name, node_name)
142
143        #check if any full backup exists
144        path, dirs = BFD.find_latest_dir(rootpath, "full")
145        if not path:
146            path = os.path.join(rootpath, tmstamp+"-full")
147            return BFD.construct_dir(path, bucket_name, node_name)
148        else:
149            #further check full backup for this bucket and node
150            path = BFD.construct_dir(path, bucket_name, node_name)
151            if not os.path.isdir(path):
152                return path
153
154        if mode.find("diff") >= 0:
155            path, dirs = BFD.find_latest_dir(rootpath, "diff")
156            if not path or new_session:
157                path = os.path.join(rootpath, tmstamp+"-diff")
158                return BFD.construct_dir(path, bucket_name, node_name)
159            else:
160                path = BFD.construct_dir(path, bucket_name, node_name)
161                if not os.path.isdir(path):
162                    return path
163                else:
164                    path = os.path.join(rootpath, tmstamp+"-diff")
165                    return BFD.construct_dir(path, bucket_name, node_name)
166        elif mode.find("accu") >= 0:
167            path, dirs = BFD.find_latest_dir(rootpath, "accu")
168            if not path or new_session:
169                path = os.path.join(rootpath, tmstamp+"-accu")
170                return BFD.construct_dir(path, bucket_name, node_name)
171            else:
172                path = BFD.construct_dir(path, bucket_name, node_name)
173                if not os.path.isdir(path):
174                    return path
175                else:
176                    path = os.path.join(rootpath, tmstamp+"-accu")
177                    return BFD.construct_dir(path, bucket_name, node_name)
178        else:
179            return parent_dir
180
181    @staticmethod
182    def find_latest_dir(parent_dir, mode):
183        all_subdirs = []
184        latest_dir = None
185        if not os.path.isdir(parent_dir):
186            return latest_dir, all_subdirs
187        for d in os.listdir(parent_dir):
188            if not mode or d.find(mode) >= 0:
189                bd = os.path.join(parent_dir, d)
190                if os.path.isdir(bd):
191                    all_subdirs.append(bd)
192        if all_subdirs:
193            all_subdirs = sorted(all_subdirs,key=os.path.getmtime, reverse=True)
194            latest_dir = all_subdirs[0]
195        return latest_dir, all_subdirs
196
197    @staticmethod
198    def find_seqno(opts, spec, bucket_name, node_name, mode):
199        seqno = {}
200        dep = {}
201        dep_list = []
202        failover_log = {}
203        snapshot_markers = {}
204        for i in range(BFD.NUM_VBUCKET):
205            seqno[str(i)] = 0
206            dep[i] = None
207            failover_log[i] = None
208            snapshot_markers[i] = None
209
210        file_list = []
211        failoverlog_list = []
212        snapshot_list = []
213        seqno_list = []
214        parent_dir = os.path.normpath(spec)
215
216        if mode == "full":
217            return seqno, dep_list, failover_log, snapshot_markers
218        timedir,latest_dirs = BFD.find_latest_dir(parent_dir, None)
219        if not timedir:
220            return seqno, dep_list, failover_log, snapshot_markers
221        fulldir, latest_dirs = BFD.find_latest_dir(timedir, "full")
222        if not fulldir:
223            return seqno, dep_list, failover_log, snapshot_markers
224
225        path = BFD.construct_dir(fulldir, bucket_name, node_name)
226        if not os.path.isdir(path):
227            return seqno, dep_list, failover_log, snapshot_markers
228
229        file_list.extend(recursive_glob(path, 'data-*.cbb'))
230        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
231        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
232        seqno_list.extend(recursive_glob(path, 'seqno.json'))
233
234        accudir, accu_dirs = BFD.find_latest_dir(timedir, "accu")
235        if accudir:
236            path = BFD.construct_dir(accudir, bucket_name, node_name)
237            if os.path.isdir(path):
238                file_list.extend(recursive_glob(path, 'data-*.cbb'))
239                failoverlog_list.extend(recursive_glob(path, 'failover.json'))
240                snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
241                seqno_list.extend(recursive_glob(path, 'seqno.json'))
242        if mode.find("diff") >= 0:
243            diffdir, diff_dirs = BFD.find_latest_dir(timedir, "diff")
244            if diff_dirs:
245                for dir in diff_dirs:
246                    path = BFD.construct_dir(dir, bucket_name, node_name)
247                    if os.path.isdir(path):
248                        file_list.extend(recursive_glob(path, 'data-*.cbb'))
249                        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
250                        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
251                        seqno_list.extend(recursive_glob(path, 'seqno.json'))
252
253        for x in sorted(seqno_list):
254            json_file = open(x, "r")
255            json_data = json.load(json_file)
256            json_file.close()
257
258            for vbid, seq in json_data.iteritems():
259                if not seq:
260                    continue
261                if seqno.get(vbid) < seq:
262                    seqno[vbid] = seq
263
264        for log_file in sorted(failoverlog_list):
265            json_file = open(log_file, "r")
266            json_data = json.load(json_file)
267            json_file.close()
268
269            for vbid, flogs in json_data.iteritems():
270                if not flogs:
271                    continue
272                elif vbid not in failover_log.keys():
273                    failover_log[vbid] = flogs
274                else:
275                    for logpair in flogs:
276                        if not failover_log[vbid]:
277                            failover_log[vbid] = [logpair]
278                        elif logpair not in failover_log[vbid]:
279                            failover_log[vbid].append(logpair)
280        for snapshot in sorted(snapshot_list):
281            json_file = open(snapshot, "r")
282            json_data = json.load(json_file)
283            json_file.close()
284
285            for vbid, markers in json_data.iteritems():
286                snapshot_markers[vbid] = markers
287
288        for i in range(BFD.NUM_VBUCKET):
289            if dep[i] and dep[i] not in dep_list:
290                dep_list.append(dep[i])
291        return seqno, dep_list, failover_log, snapshot_markers
292
293# --------------------------------------------------
294
295class BFDSource(BFD, pump.Source):
296    """Can read from backup-file/directory layout."""
297
298    def __init__(self, opts, spec, source_bucket, source_node,
299                 source_map, sink_map, ctl, cur):
300        super(BFDSource, self).__init__(opts, spec, source_bucket, source_node,
301                                        source_map, sink_map, ctl, cur)
302        self.done = False
303        self.files = None
304        self.cursor_db = None
305
306    @staticmethod
307    def can_handle(opts, spec):
308        if os.path.isdir(spec):
309            dirs = glob.glob(spec + "/bucket-*/node-*/data-*.cbb")
310            if dirs:
311                return True
312            path, dirs = BFD.find_latest_dir(spec, None)
313            if path:
314                path, dirs = BFD.find_latest_dir(path, "full")
315                if path:
316                    return glob.glob(path + "*/bucket-*/node-*/data-*.cbb")
317        return False
318
319    @staticmethod
320    def check(opts, spec):
321        spec = os.path.normpath(spec)
322        if not os.path.isdir(spec):
323            return "error: backup_dir is not a directory: " + spec, None
324
325        buckets = []
326
327        bucket_dirs = glob.glob(spec + "/bucket-*")
328        if not bucket_dirs:
329            #check 3.0 directory structure
330            path, dirs = BFD.find_latest_dir(spec, None)
331            if not path:
332                return "error: no backup directory found: " + spec, None
333            latest_dir, dir = BFD.find_latest_dir(path, "full")
334            bucket_dirs = glob.glob(latest_dir + "/bucket-*")
335
336        for bucket_dir in sorted(bucket_dirs):
337            if not os.path.isdir(bucket_dir):
338                return "error: not a bucket directory: " + bucket_dir, None
339
340            bucket_name = os.path.basename(bucket_dir)[len("bucket-"):].strip()
341            bucket_name = urllib.unquote_plus(bucket_name)
342            if not bucket_name:
343                return "error: bucket_name too short: " + bucket_dir, None
344
345            bucket = { 'name': bucket_name, 'nodes': [] }
346            buckets.append(bucket)
347
348            node_dirs = glob.glob(bucket_dir + "/node-*")
349            for node_dir in sorted(node_dirs):
350                if not os.path.isdir(node_dir):
351                    return "error: not a node directory: " + node_dir, None
352
353                node_name = os.path.basename(node_dir)[len("node-"):].strip()
354                node_name = urllib.unquote_plus(node_name)
355                if not node_name:
356                    return "error: node_name too short: " + node_dir, None
357
358                bucket['nodes'].append({ 'hostname': node_name })
359
360        return 0, { 'spec': spec,
361                    'buckets': buckets }
362
363    @staticmethod
364    def provide_design(opts, source_spec, source_bucket, source_map):
365        fname = BFD.design_path(source_spec, source_bucket['name'])
366        if os.path.exists(fname):
367            try:
368                f = open(fname, 'r')
369                d = f.read()
370                f.close()
371                return 0, d
372            except IOError, e:
373                return ("error: could not read design: %s" +
374                        "; exception: %s") % (fname, e), None
375        return 0, None
376
377    def provide_batch(self):
378        if self.done:
379            return 0, None
380
381        batch = pump.Batch(self)
382
383        batch_max_size = self.opts.extra['batch_max_size']
384        batch_max_bytes = self.opts.extra['batch_max_bytes']
385
386        s = ["SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val FROM cbb_msg",
387             "SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, meta_size FROM cbb_msg"]
388
389        if self.files is None: # None != [], as self.files will shrink to [].
390            g =  glob.glob(BFD.db_dir(self.spec, self.bucket_name(), self.node_name()) + "/data-*.cbb")
391            if not g:
392                #check 3.0 file structure
393                rv, file_list = BFDSource.list_files(self.opts,
394                                                     self.spec,
395                                                     self.bucket_name(),
396                                                     self.node_name(),
397                                                     "data-*.cbb")
398                if rv != 0:
399                    return rv, None
400                from_date = getattr(self.opts, "from_date", None)
401                if from_date:
402                    from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d")
403
404                to_date = getattr(self.opts, "to_date", None)
405                if to_date:
406                    to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d")
407                g = []
408                for f in file_list:
409                    mtime = datetime.datetime.fromtimestamp(os.path.getmtime(f))
410                    if (not from_date or mtime >= from_date) and (not to_date or mtime <= to_date):
411                        g.append(f)
412            self.files = sorted(g)
413        try:
414            ver = 0
415            while (not self.done and
416                   batch.size() < batch_max_size and
417                   batch.bytes < batch_max_bytes):
418                if self.cursor_db is None:
419                    if not self.files:
420                        self.done = True
421                        return 0, batch
422
423                    rv, db, ver = connect_db(self.files[0], self.opts, CBB_VERSION)
424                    if rv != 0:
425                        return rv, None
426                    self.files = self.files[1:]
427
428                    cursor = db.cursor()
429                    cursor.execute(s[ver])
430
431                    self.cursor_db = (cursor, db)
432
433                cursor, db = self.cursor_db
434
435                row = cursor.fetchone()
436                if row:
437                    vbucket_id = row[1]
438                    key = row[2]
439                    val = row[7]
440
441                    if self.skip(key, vbucket_id):
442                        continue
443                    msg = (row[0], row[1], row[2], row[3], row[4],
444                           int(row[5]), # CAS as 64-bit integer not string.
445                           row[6], # revid as 64-bit integer too
446                           row[7])
447                    if ver == 1:
448                        msg = msg + (row[8], row[9], row[10])
449                    else:
450                        msg = msg + (0, 0, 0)
451                    batch.append(msg, len(val))
452                else:
453                    if self.cursor_db:
454                        self.cursor_db[0].close()
455                        self.cursor_db[1].close()
456                    self.cursor_db = None
457
458            return 0, batch
459
460        except Exception, e:
461            self.done = True
462            if self.cursor_db:
463                self.cursor_db[0].close()
464                self.cursor_db[1].close()
465            self.cursor_db = None
466
467            return "error: exception reading backup file: " + str(e), None
468
469    @staticmethod
470    def total_msgs(opts, source_bucket, source_node, source_map):
471        t = 0
472        file_list = glob.glob(BFD.db_dir(
473                                 source_map['spec'],
474                                 source_bucket['name'],
475                                 source_node['hostname']) + "/data-*.cbb")
476        if not file_list:
477            #check 3.0 directory structure
478            rv, file_list = BFDSource.list_files(opts,
479                                        source_map['spec'],
480                                        source_bucket['name'],
481                                        source_node['hostname'],
482                                        "data-*.cbb")
483            if rv != 0:
484                return rv, None
485
486        for x in sorted(file_list):
487            rv, db, ver = connect_db(x, opts, CBB_VERSION)
488            if rv != 0:
489                return rv, None
490
491            cur = db.cursor()
492            # TODO: (1) BFDSource - COUNT(*) is not indexed.
493            cur.execute("SELECT COUNT(*) FROM cbb_msg;")
494            t = t + cur.fetchone()[0]
495            cur.close()
496            db.close()
497
498        return 0, t
499
500    @staticmethod
501    def list_files(opts, spec, bucket, node, pattern):
502        file_list = []
503        prec_list = []
504        path, dirs = BFD.find_latest_dir(spec, None)
505        if not path:
506            return "error: No valid data in path:" % spec, None
507
508        path, dirs = BFD.find_latest_dir(path, None)
509        if not path:
510            return 0, file_list
511
512        for dir in dirs:
513            latest_dir = BFD.construct_dir(dir, bucket, node)
514            file_list.extend(glob.glob(os.path.join(latest_dir, pattern)))
515            for p in BFD.get_predecessors(latest_dir):
516                prec_list.append(os.path.dirname(p))
517            if len(prec_list) > 0:
518                break
519
520        while len(prec_list) > 0:
521            deps = glob.glob(os.path.join(prec_list[0], pattern))
522            for d in glob.glob(os.path.join(prec_list[0], pattern)):
523                if d not in file_list:
524                    file_list.append(d)
525            for p in BFD.get_predecessors(prec_list[0]):
526                dirname = os.path.dirname(p)
527                if dirname not in prec_list:
528                    prec_list.append(dirname)
529            prec_list = prec_list[1:]
530
531        return 0, file_list
532
533# --------------------------------------------------
534
535class BFDSink(BFD, pump.Sink):
536    """Can write to backup-file/directory layout."""
537
538    def __init__(self, opts, spec, source_bucket, source_node,
539                 source_map, sink_map, ctl, cur):
540        super(BFDSink, self).__init__(opts, spec, source_bucket, source_node,
541                                      source_map, sink_map, ctl, cur)
542        self.mode = "full"
543        self.init_worker(BFDSink.run)
544
545    @staticmethod
546    def run(self):
547        """Worker thread to asynchronously store incoming batches into db."""
548        s = "INSERT INTO cbb_msg (cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, \
549            dtype, meta_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
550        db = None
551        cbb = 0       # Current cbb file NUM, like data-NUM.cbb.
552        cbb_bytes = 0 # Current cbb msg value bytes total.
553        db_dir = None
554        cbb_max_bytes = \
555            self.opts.extra.get("cbb_max_mb", 100000) * 1024 * 1024
556        _, dep, _, _ = BFD.find_seqno(self.opts,
557                                                  self.spec,
558                                                  self.bucket_name(),
559                                                  self.node_name(),
560                                                  self.mode)
561        seqno_map = {}
562        for i in range(BFD.NUM_VBUCKET):
563            seqno_map[i] = 0
564        while not self.ctl['stop']:
565            batch, future = self.pull_next_batch()
566            if not batch:
567                if db:
568                    db.close()
569                return self.future_done(future, 0)
570
571            if db and cbb_bytes >= cbb_max_bytes:
572                db.close()
573                db = None
574                cbb += 1
575                cbb_bytes = 0
576                db_dir = None
577
578            if not db:
579                rv, db, db_dir = self.create_db(cbb)
580                if rv != 0:
581                    return self.future_done(future, rv)
582
583                meta_file = os.path.join(db_dir, "meta.json")
584                json_file = open(meta_file, "w")
585                json.dump({'pred': dep}, json_file, ensure_ascii=False)
586                json_file.close()
587
588            if (self.bucket_name(), self.node_name()) in self.cur['failoverlog']:
589                BFD.write_json_file(db_dir,
590                                    "failover.json",
591                                    self.cur['failoverlog'][(self.bucket_name(), self.node_name())])
592            if (self.bucket_name(), self.node_name()) in self.cur['snapshot']:
593                BFD.write_json_file(db_dir,
594                                    "snapshot_markers.json",
595                                    self.cur['snapshot'][(self.bucket_name(), self.node_name())])
596            try:
597                c = db.cursor()
598
599                for msg in batch.msgs:
600                    cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta = msg
601                    if self.skip(key, vbucket_id):
602                        continue
603                    if cmd not in [couchbaseConstants.CMD_TAP_MUTATION,
604                                   couchbaseConstants.CMD_TAP_DELETE,
605                                   couchbaseConstants.CMD_DCP_MUTATION,
606                                   couchbaseConstants.CMD_DCP_DELETE]:
607                        if db:
608                            db.close()
609                        return self.future_done(future,
610                                                "error: BFDSink bad cmd: " +
611                                                str(cmd))
612                    c.execute(s, (cmd, vbucket_id,
613                                  sqlite3.Binary(key),
614                                  flg, exp, str(cas),
615                                  sqlite3.Binary(str(meta)),
616                                  sqlite3.Binary(val),
617                                  seqno,
618                                  dtype,
619                                  nmeta))
620                    cbb_bytes += len(val)
621                    if seqno_map[vbucket_id] < seqno:
622                        seqno_map[vbucket_id] = seqno
623                db.commit()
624                BFD.write_json_file(db_dir, "seqno.json", seqno_map)
625                self.future_done(future, 0) # No return to keep looping.
626
627            except sqlite3.Error, e:
628                return self.future_done(future, "error: db error: " + str(e))
629            except Exception, e:
630                return self.future_done(future, "error: db exception: " + str(e))
631
632    @staticmethod
633    def can_handle(opts, spec):
634        spec = os.path.normpath(spec)
635        return (os.path.isdir(spec) or (not os.path.exists(spec) and
636                                        os.path.isdir(os.path.dirname(spec))))
637
638    @staticmethod
639    def check(opts, spec, source_map):
640        # TODO: (2) BFDSink - check disk space.
641        # TODO: (2) BFDSink - check should report on pre-creatable directories.
642
643        spec = os.path.normpath(spec)
644
645        # Check that directory's empty.
646        if os.path.exists(spec):
647            if not os.path.isdir(spec):
648                return "error: backup directory is not a directory: " + spec, None
649            if not os.access(spec, os.W_OK):
650                return "error: backup directory is not writable: " + spec, None
651            return 0, None
652
653        # Or, that the parent directory exists.
654        parent_dir = os.path.dirname(spec)
655
656        if not os.path.exists(parent_dir):
657            return "error: missing parent directory: " + parent_dir, None
658        if not os.path.isdir(parent_dir):
659            return "error: parent directory is not a directory: " + parent_dir, None
660        if not os.access(parent_dir, os.W_OK):
661            return "error: parent directory is not writable: " + parent_dir, None
662        return 0, None
663
664    @staticmethod
665    def consume_design(opts, sink_spec, sink_map,
666                       source_bucket, source_map, source_design):
667        if source_design:
668            fname = BFD.design_path(sink_spec,
669                                    source_bucket['name'])
670            try:
671                rv = pump.mkdirs(fname)
672                if rv:
673                    return rv, None
674                f = open(fname, 'w')
675                f.write(source_design)
676                f.close()
677            except IOError, e:
678                return ("error: could not write design: %s" +
679                        "; exception: %s") % (fname, e), None
680        return 0
681
682    def consume_batch_async(self, batch):
683        return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
684
685    def create_db(self, num):
686        rv, dir = self.mkdirs()
687        if rv != 0:
688            return rv, None, None
689
690        path = dir + "/data-%s.cbb" % (string.rjust(str(num), 4, '0'))
691        rv, db = create_db(path, self.opts)
692        if rv != 0:
693            return rv, None, None
694
695        try:
696            import copy
697            tmp_map = copy.deepcopy(self.source_map)
698            if 'spec_parts' in tmp_map:
699                del tmp_map['spec_parts']
700            cur = db.cursor()
701            cur.execute("INSERT INTO cbb_meta (key, val) VALUES (?, ?)",
702                        ("source_bucket.json",
703                         json.dumps(cleanse(self.source_bucket))))
704            cur.execute("INSERT INTO cbb_meta (key, val) VALUES (?, ?)",
705                        ("source_node.json",
706                         json.dumps(cleanse(tmp_map))))
707            cur.execute("INSERT INTO cbb_meta (key, val) VALUES (?, ?)",
708                        ("source_map.json",
709                         json.dumps(cleanse(self.source_map))))
710            cur.execute("INSERT INTO cbb_meta (key, val) VALUES (?, ?)",
711                        ("start.datetime", time.strftime("%Y/%m/%d-%H:%M:%S")))
712            db.commit()
713        except sqlite3.Error, e:
714            return "error: create_db error: " + str(e), None, None
715        except Exception, e:
716            return "error: create_db exception: " + str(e), None, None
717
718        return 0, db, dir
719
720    def mkdirs(self):
721        """Make directories, if not already, with structure like...
722           <spec>/
723             YYYY-MM-DDThhmmssZ/
724                YYYY-MM-DDThhmmssZ-full /
725                   bucket-<BUCKETNAME>/
726                     design.json
727                     node-<NODE>/
728                       data-<XXXX>.cbb
729                YYYY-MM-DDThhmmssZ-diff/
730                   bucket-<BUCKETNAME>/
731                     design.json
732                     node-<NODE>/
733                       data-<XXXX>.cbb
734                   """
735        """CBSE-1052: There appears to be a race condition in os.mkdir. Suppose
736           more than two threads simultaneously try to create the same directory
737           or different directories with a common non-existent ancestor. Both check
738           the directory doesn't exists, then both invoke os.mkdir. One of these
739           will throw OSError due to underlying EEXIST system error."""
740
741        spec = os.path.normpath(self.spec)
742        if not os.path.isdir(spec):
743            try:
744                os.mkdir(spec)
745            except OSError, e:
746                if not os.path.isdir(spec):
747                    return "error: could not mkdir: %s; exception: %s" % (spec, e)
748
749        new_session = self.ctl['new_session']
750        self.ctl['new_session'] = False
751        d = BFD.db_dir(self.spec,
752                       self.bucket_name(),
753                       self.node_name(),
754                       self.ctl['new_timestamp'],
755                       getattr(self.opts, "mode", "diff"),
756                       new_session)
757        if not os.path.isdir(d):
758            try:
759                os.makedirs(d)
760            except OSError, e:
761                if not os.path.isdir(d):
762                    return "error: could not mkdirs: %s; exception: %s" % (d, e), None
763        return 0, d
764
765
766# --------------------------------------------------
767
768def create_db(db_path, opts):
769    try:
770        logging.debug("  create_db: " + db_path)
771
772        rv, db, ver = connect_db(db_path, opts, [0])
773        if rv != 0:
774            logging.debug("fail to call connect_db:" + db_path)
775            return rv, None
776
777        # The cas column is type text, not integer, because sqlite
778        # integer is 63-bits instead of 64-bits.
779        db.executescript("""
780                  BEGIN;
781                  CREATE TABLE cbb_msg
782                     (cmd integer,
783                      vbucket_id integer,
784                      key blob,
785                      flg integer,
786                      exp integer,
787                      cas text,
788                      meta blob,
789                      val blob,
790                      seqno integer,
791                      dtype integer,
792                      meta_size integer);
793                  CREATE TABLE cbb_meta
794                     (key text,
795                      val blob);
796                  pragma user_version=%s;
797                  COMMIT;
798                """ % (CBB_VERSION[1]))
799
800        return 0, db
801
802    except Exception, e:
803        return "error: create_db exception: " + str(e), None
804
805def connect_db(db_path, opts, version):
806    try:
807        # TODO: (1) BFD - connect_db - use pragma page_size.
808        # TODO: (1) BFD - connect_db - use pragma max_page_count.
809        # TODO: (1) BFD - connect_db - use pragma journal_mode.
810
811        logging.debug("  connect_db: " + db_path)
812
813        db = sqlite3.connect(db_path)
814        db.text_factory = str
815
816        cur = db.execute("pragma user_version").fetchall()[0][0]
817        if cur not in version:
818            logging.debug("dbpath is not empty: " + db_path)
819            return "error: unexpected db user version: " + \
820                str(cur) + " vs " + str(version) + \
821                ", maybe a backup directory created by older releases is reused", \
822                None, None
823
824        return 0, db, version.index(cur)
825
826    except Exception, e:
827        return "error: connect_db exception: " + str(e), None
828
829def cleanse(d):
830    """Elide passwords from hierarchy of dict/list's."""
831    return cleanse_helper(copy.deepcopy(d))
832
833def cleanse_helper(d):
834    """Recursively, destructively elide passwords from hierarchy of dict/list's."""
835    if type(d) is list:
836        for x in d:
837            cleanse_helper(x)
838    elif type(d) is dict:
839        for k, v in d.iteritems():
840            if "assword" in k:
841                d[k] = '<...ELIDED...>'
842            else:
843                d[k] = cleanse_helper(v)
844    return d
845
846def recursive_glob(rootdir='.', pattern='*'):
847    return [os.path.join(rootdir, filename)
848            for rootdir, dirnames, filenames in os.walk(rootdir)
849            for filename in filenames
850            if fnmatch.fnmatch(filename, pattern)]
851
852def local_to_utc(t):
853    secs = time.mktime(t.timetuple())
854    return datetime.datetime.utcfromtimestamp(secs)
855