xref: /3.0.3-GA/couchbase-cli/pump_bfd.py (revision 65baf351)
1#!/usr/bin/env python
2
3import copy
4import glob
5import logging
6import os
7import simplejson as json
8import string
9import sys
10import datetime
11import time
12import urllib
13import fnmatch
14
15import couchbaseConstants
16import pump
17
18import_stmts = (
19    'from pysqlite2 import dbapi2 as sqlite3',
20    'import sqlite3',
21)
22for status, stmt in enumerate(import_stmts):
23    try:
24        exec stmt
25        break
26    except ImportError:
27        status = None
28if status is None:
29    sys.exit("Error: could not import sqlite3 module")
30
31CBB_VERSION = [2004, 2014] # sqlite pragma user version.
32
33class BFD:
34    """Mixin for backup-file/directory EndPoint helper methods."""
35    NUM_VBUCKET = 1024
36
37    def bucket_name(self):
38        return self.source_bucket['name']
39
40    def node_name(self):
41        return self.source_node['hostname']
42
43    @staticmethod
44    def design_path(spec, bucket_name):
45        bucket_path = os.path.normpath(spec) + "/bucket-" + urllib.quote_plus(bucket_name)
46        if os.path.isdir(bucket_path):
47            return bucket_path + '/design.json'
48        else:
49            path, dirs = BFD.find_latest_dir(spec, None)
50            if path:
51                path, dirs = BFD.find_latest_dir(path, None)
52                if path:
53                    return path + "/bucket-" + urllib.quote_plus(bucket_name) + '/design.json'
54        return bucket_path + '/design.json'
55
56    @staticmethod
57    def construct_dir(parent, bucket_name, node_name):
58        return os.path.join(parent,
59                    "bucket-" + urllib.quote_plus(bucket_name),
60                    "node-" + urllib.quote_plus(node_name))
61
62    @staticmethod
63    def check_full_dbfiles(parent_dir):
64        return glob.glob(os.path.join(parent_dir, "data-*.cbb"))
65
66    @staticmethod
67    def get_predecessors(parent_dir):
68        try:
69            json_file = open(os.path.join(parent_dir, "meta.json"), "r")
70            json_data = json.load(json_file)
71            json_file.close()
72            return json_data["pred"]
73        except IOError:
74            return []
75
76    @staticmethod
77    def get_failover_log(parent_dir):
78        filepath = os.path.join(parent_dir, "failover.json")
79        if os.path.isfile(filepath):
80            json_file = open(filepath, "r")
81            json_data = json.load(json_file)
82            json_file.close()
83            return json_data
84        else:
85            return {}
86
87    @staticmethod
88    def list_files(opts, spec, bucket, node, pattern):
89        file_list = []
90        prec_list = []
91
92    @staticmethod
93    def write_json_file(parent_dir, filename, output_data):
94        filepath = os.path.join(parent_dir, filename)
95        json_data = {}
96        if os.path.isfile(filepath):
97            json_file = open(filepath, "r")
98            json_data = json.load(json_file)
99            json_file.close()
100
101        for i in range(BFD.NUM_VBUCKET):
102            if output_data.get(i):
103                json_data[i] = output_data[i]
104
105        json_file = open(filepath, "w")
106        json.dump(json_data, json_file, ensure_ascii=False)
107        json_file.close()
108
109    @staticmethod
110    def db_dir(spec, bucket_name, node_name, tmstamp=None, mode=None, new_session=False):
111        parent_dir = os.path.normpath(spec) + \
112                        '/bucket-' + urllib.quote_plus(bucket_name) + \
113                        '/node-' + urllib.quote_plus(node_name)
114        if os.path.isdir(parent_dir):
115            return parent_dir
116
117        #check 3.0 directory structure
118        if not tmstamp:
119            tmstamp = time.strftime("%Y-%m-%dT%H%M%SZ", time.gmtime())
120        parent_dir = os.path.normpath(spec)
121        rootpath, dirs = BFD.find_latest_dir(parent_dir, None)
122        if not rootpath or not mode or mode == "full":
123            # no any backup roots exists
124            path = os.path.join(parent_dir, tmstamp, tmstamp+"-full")
125            return BFD.construct_dir(path, bucket_name, node_name)
126
127        #check if any full backup exists
128        path, dirs = BFD.find_latest_dir(rootpath, "full")
129        if not path:
130            path = os.path.join(rootpath, tmstamp+"-full")
131            return BFD.construct_dir(path, bucket_name, node_name)
132        else:
133            #further check full backup for this bucket and node
134            path = BFD.construct_dir(path, bucket_name, node_name)
135            if not os.path.isdir(path):
136                return path
137
138        if mode.find("diff") >= 0:
139            path, dirs = BFD.find_latest_dir(rootpath, "diff")
140            if not path or new_session:
141                path = os.path.join(rootpath, tmstamp+"-diff")
142                return BFD.construct_dir(path, bucket_name, node_name)
143            else:
144                path = BFD.construct_dir(path, bucket_name, node_name)
145                if not os.path.isdir(path):
146                    return path
147                else:
148                    path = os.path.join(rootpath, tmstamp+"-diff")
149                    return BFD.construct_dir(path, bucket_name, node_name)
150        elif mode.find("accu") >= 0:
151            path, dirs = BFD.find_latest_dir(rootpath, "accu")
152            if not path or new_session:
153                path = os.path.join(rootpath, tmstamp+"-accu")
154                return BFD.construct_dir(path, bucket_name, node_name)
155            else:
156                path = BFD.construct_dir(path, bucket_name, node_name)
157                if not os.path.isdir(path):
158                    return path
159                else:
160                    path = os.path.join(rootpath, tmstamp+"-accu")
161                    return BFD.construct_dir(path, bucket_name, node_name)
162        else:
163            return parent_dir
164
165    @staticmethod
166    def find_latest_dir(parent_dir, mode):
167        all_subdirs = []
168        latest_dir = None
169        if not os.path.isdir(parent_dir):
170            return latest_dir, all_subdirs
171        for d in os.listdir(parent_dir):
172            if not mode or d.find(mode) >= 0:
173                bd = os.path.join(parent_dir, d)
174                if os.path.isdir(bd):
175                    all_subdirs.append(bd)
176        if all_subdirs:
177            all_subdirs = sorted(all_subdirs,key=os.path.getmtime, reverse=True)
178            latest_dir = all_subdirs[0]
179        return latest_dir, all_subdirs
180
181    @staticmethod
182    def find_seqno(opts, spec, bucket_name, node_name, mode):
183        seqno = {}
184        dep = {}
185        dep_list = []
186        failover_log = {}
187        snapshot_markers = {}
188        for i in range(BFD.NUM_VBUCKET):
189            seqno[str(i)] = 0
190            dep[i] = None
191            failover_log[i] = None
192            snapshot_markers[i] = None
193
194        file_list = []
195        failoverlog_list = []
196        snapshot_list = []
197        seqno_list = []
198        parent_dir = os.path.normpath(spec)
199
200        if mode == "full":
201            return seqno, dep_list, failover_log, snapshot_markers
202        timedir,latest_dirs = BFD.find_latest_dir(parent_dir, None)
203        if not timedir:
204            return seqno, dep_list, failover_log, snapshot_markers
205        fulldir, latest_dirs = BFD.find_latest_dir(timedir, "full")
206        if not fulldir:
207            return seqno, dep_list, failover_log, snapshot_markers
208
209        path = BFD.construct_dir(fulldir, bucket_name, node_name)
210        if not os.path.isdir(path):
211            return seqno, dep_list, failover_log, snapshot_markers
212
213        file_list.extend(recursive_glob(path, 'data-*.cbb'))
214        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
215        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
216        seqno_list.extend(recursive_glob(path, 'seqno.json'))
217
218        accudir, accu_dirs = BFD.find_latest_dir(timedir, "accu")
219        if accudir:
220            path = BFD.construct_dir(accudir, bucket_name, node_name)
221            if os.path.isdir(path):
222                file_list.extend(recursive_glob(path, 'data-*.cbb'))
223                failoverlog_list.extend(recursive_glob(path, 'failover.json'))
224                snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
225                seqno_list.extend(recursive_glob(path, 'seqno.json'))
226        if mode.find("diff") >= 0:
227            diffdir, diff_dirs = BFD.find_latest_dir(timedir, "diff")
228            if diff_dirs:
229                for dir in diff_dirs:
230                    path = BFD.construct_dir(dir, bucket_name, node_name)
231                    if os.path.isdir(path):
232                        file_list.extend(recursive_glob(path, 'data-*.cbb'))
233                        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
234                        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
235                        seqno_list.extend(recursive_glob(path, 'seqno.json'))
236
237        for x in sorted(seqno_list):
238            json_file = open(x, "r")
239            json_data = json.load(json_file)
240            json_file.close()
241
242            for vbid, seq in json_data.iteritems():
243                if not seq:
244                    continue
245                if seqno.get(vbid) < seq:
246                    seqno[vbid] = seq
247
248        for log_file in sorted(failoverlog_list):
249            json_file = open(log_file, "r")
250            json_data = json.load(json_file)
251            json_file.close()
252
253            for vbid, flogs in json_data.iteritems():
254                if not flogs:
255                    continue
256                elif vbid not in failover_log.keys():
257                    failover_log[vbid] = flogs
258                else:
259                    for logpair in flogs:
260                        if not failover_log[vbid]:
261                            failover_log[vbid] = [logpair]
262                        elif logpair not in failover_log[vbid]:
263                            failover_log[vbid].append(logpair)
264        for snapshot in sorted(snapshot_list):
265            json_file = open(snapshot, "r")
266            json_data = json.load(json_file)
267            json_file.close()
268
269            for vbid, markers in json_data.iteritems():
270                snapshot_markers[vbid] = markers
271
272        for i in range(BFD.NUM_VBUCKET):
273            if dep[i] and dep[i] not in dep_list:
274                dep_list.append(dep[i])
275        return seqno, dep_list, failover_log, snapshot_markers
276
277# --------------------------------------------------
278
279class BFDSource(BFD, pump.Source):
280    """Can read from backup-file/directory layout."""
281
282    def __init__(self, opts, spec, source_bucket, source_node,
283                 source_map, sink_map, ctl, cur):
284        super(BFDSource, self).__init__(opts, spec, source_bucket, source_node,
285                                        source_map, sink_map, ctl, cur)
286        self.done = False
287        self.files = None
288        self.cursor_db = None
289
290    @staticmethod
291    def can_handle(opts, spec):
292        if os.path.isdir(spec):
293            dirs = glob.glob(spec + "/bucket-*/node-*/data-*.cbb")
294            if dirs:
295                return True
296            path, dirs = BFD.find_latest_dir(spec, None)
297            if path:
298                path, dirs = BFD.find_latest_dir(path, "full")
299                if path:
300                    return glob.glob(path + "*/bucket-*/node-*/data-*.cbb")
301        return False
302
303    @staticmethod
304    def check(opts, spec):
305        spec = os.path.normpath(spec)
306        if not os.path.isdir(spec):
307            return "error: backup_dir is not a directory: " + spec, None
308
309        buckets = []
310
311        bucket_dirs = glob.glob(spec + "/bucket-*")
312        if not bucket_dirs:
313            #check 3.0 directory structure
314            path, dirs = BFD.find_latest_dir(spec, None)
315            if not path:
316                return "error: no backup directory found: " + spec, None
317            latest_dir, dir = BFD.find_latest_dir(path, "full")
318            bucket_dirs = glob.glob(latest_dir + "/bucket-*")
319
320        for bucket_dir in sorted(bucket_dirs):
321            if not os.path.isdir(bucket_dir):
322                return "error: not a bucket directory: " + bucket_dir, None
323
324            bucket_name = os.path.basename(bucket_dir)[len("bucket-"):].strip()
325            bucket_name = urllib.unquote_plus(bucket_name)
326            if not bucket_name:
327                return "error: bucket_name too short: " + bucket_dir, None
328
329            bucket = { 'name': bucket_name, 'nodes': [] }
330            buckets.append(bucket)
331
332            node_dirs = glob.glob(bucket_dir + "/node-*")
333            for node_dir in sorted(node_dirs):
334                if not os.path.isdir(node_dir):
335                    return "error: not a node directory: " + node_dir, None
336
337                node_name = os.path.basename(node_dir)[len("node-"):].strip()
338                node_name = urllib.unquote_plus(node_name)
339                if not node_name:
340                    return "error: node_name too short: " + node_dir, None
341
342                bucket['nodes'].append({ 'hostname': node_name })
343
344        return 0, { 'spec': spec,
345                    'buckets': buckets }
346
347    @staticmethod
348    def provide_design(opts, source_spec, source_bucket, source_map):
349        fname = BFD.design_path(source_spec, source_bucket['name'])
350        if os.path.exists(fname):
351            try:
352                f = open(fname, 'r')
353                d = f.read()
354                f.close()
355                return 0, d
356            except IOError, e:
357                return ("error: could not read design: %s" +
358                        "; exception: %s") % (fname, e), None
359        return 0, None
360
361    def provide_batch(self):
362        if self.done:
363            return 0, None
364
365        batch = pump.Batch(self)
366
367        batch_max_size = self.opts.extra['batch_max_size']
368        batch_max_bytes = self.opts.extra['batch_max_bytes']
369
370        s = ["SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val FROM cbb_msg",
371             "SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, meta_size FROM cbb_msg"]
372
373        if self.files is None: # None != [], as self.files will shrink to [].
374            g =  glob.glob(BFD.db_dir(self.spec, self.bucket_name(), self.node_name()) + "/data-*.cbb")
375            if not g:
376                #check 3.0 file structure
377                rv, file_list = BFDSource.list_files(self.opts,
378                                                     self.spec,
379                                                     self.bucket_name(),
380                                                     self.node_name(),
381                                                     "data-*.cbb")
382                if rv != 0:
383                    return rv, None
384                from_date = getattr(self.opts, "from_date", None)
385                if from_date:
386                    from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d")
387
388                to_date = getattr(self.opts, "to_date", None)
389                if to_date:
390                    to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d")
391                g = []
392                for f in file_list:
393                    mtime = datetime.datetime.fromtimestamp(os.path.getmtime(f))
394                    if (not from_date or mtime >= from_date) and (not to_date or mtime <= to_date):
395                        g.append(f)
396            self.files = sorted(g)
397        try:
398            ver = 0
399            while (not self.done and
400                   batch.size() < batch_max_size and
401                   batch.bytes < batch_max_bytes):
402                if self.cursor_db is None:
403                    if not self.files:
404                        self.done = True
405                        return 0, batch
406
407                    rv, db, ver = connect_db(self.files[0], self.opts, CBB_VERSION)
408                    if rv != 0:
409                        return rv, None
410                    self.files = self.files[1:]
411
412                    cursor = db.cursor()
413                    cursor.execute(s[ver])
414
415                    self.cursor_db = (cursor, db)
416
417                cursor, db = self.cursor_db
418
419                row = cursor.fetchone()
420                if row:
421                    vbucket_id = row[1]
422                    key = row[2]
423                    val = row[7]
424
425                    if self.skip(key, vbucket_id):
426                        continue
427                    msg = (row[0], row[1], row[2], row[3], row[4],
428                           int(row[5]), # CAS as 64-bit integer not string.
429                           row[6], # revid as 64-bit integer too
430                           row[7])
431                    if ver == 1:
432                        msg = msg + (row[8], row[9], row[10])
433                    else:
434                        msg = msg + (0, 0, 0)
435                    batch.append(msg, len(val))
436                else:
437                    if self.cursor_db:
438                        self.cursor_db[0].close()
439                        self.cursor_db[1].close()
440                    self.cursor_db = None
441
442            return 0, batch
443
444        except Exception, e:
445            self.done = True
446            if self.cursor_db:
447                self.cursor_db[0].close()
448                self.cursor_db[1].close()
449            self.cursor_db = None
450
451            return "error: exception reading backup file: " + str(e), None
452
453    @staticmethod
454    def total_msgs(opts, source_bucket, source_node, source_map):
455        t = 0
456        file_list = glob.glob(BFD.db_dir(
457                                 source_map['spec'],
458                                 source_bucket['name'],
459                                 source_node['hostname']) + "/data-*.cbb")
460        if not file_list:
461            #check 3.0 directory structure
462            rv, file_list = BFDSource.list_files(opts,
463                                        source_map['spec'],
464                                        source_bucket['name'],
465                                        source_node['hostname'],
466                                        "data-*.cbb")
467            if rv != 0:
468                return rv, None
469
470        for x in sorted(file_list):
471            rv, db, ver = connect_db(x, opts, CBB_VERSION)
472            if rv != 0:
473                return rv, None
474
475            cur = db.cursor()
476            # TODO: (1) BFDSource - COUNT(*) is not indexed.
477            cur.execute("SELECT COUNT(*) FROM cbb_msg;")
478            t = t + cur.fetchone()[0]
479            cur.close()
480            db.close()
481
482        return 0, t
483
484    @staticmethod
485    def list_files(opts, spec, bucket, node, pattern):
486        file_list = []
487        prec_list = []
488        path, dirs = BFD.find_latest_dir(spec, None)
489        if not path:
490            return "error: No valid data in path:" % spec, None
491
492        path, dirs = BFD.find_latest_dir(path, None)
493        if not path:
494            return 0, file_list
495
496        for dir in dirs:
497            latest_dir = BFD.construct_dir(dir, bucket, node)
498            file_list.extend(glob.glob(os.path.join(latest_dir, pattern)))
499            for p in BFD.get_predecessors(latest_dir):
500                prec_list.append(os.path.dirname(p))
501            if len(prec_list) > 0:
502                break
503
504        while len(prec_list) > 0:
505            deps = glob.glob(os.path.join(prec_list[0], pattern))
506            for d in glob.glob(os.path.join(prec_list[0], pattern)):
507                if d not in file_list:
508                    file_list.append(d)
509            for p in BFD.get_predecessors(prec_list[0]):
510                dirname = os.path.dirname(p)
511                if dirname not in prec_list:
512                    prec_list.append(dirname)
513            prec_list = prec_list[1:]
514
515        return 0, file_list
516
517# --------------------------------------------------
518
519class BFDSink(BFD, pump.Sink):
520    """Can write to backup-file/directory layout."""
521
522    def __init__(self, opts, spec, source_bucket, source_node,
523                 source_map, sink_map, ctl, cur):
524        super(BFDSink, self).__init__(opts, spec, source_bucket, source_node,
525                                      source_map, sink_map, ctl, cur)
526        self.mode = "full"
527        self.init_worker(BFDSink.run)
528
529    @staticmethod
530    def run(self):
531        """Worker thread to asynchronously store incoming batches into db."""
532        s = "INSERT INTO cbb_msg (cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, \
533            dtype, meta_size) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
534        db = None
535        cbb = 0       # Current cbb file NUM, like data-NUM.cbb.
536        cbb_bytes = 0 # Current cbb msg value bytes total.
537        db_dir = None
538        cbb_max_bytes = \
539            self.opts.extra.get("cbb_max_mb", 100000) * 1024 * 1024
540        _, dep, _, _ = BFD.find_seqno(self.opts,
541                                                  self.spec,
542                                                  self.bucket_name(),
543                                                  self.node_name(),
544                                                  self.mode)
545        seqno_map = {}
546        for i in range(BFD.NUM_VBUCKET):
547            seqno_map[i] = 0
548        while not self.ctl['stop']:
549            batch, future = self.pull_next_batch()
550            if not batch:
551                if db:
552                    db.close()
553                return self.future_done(future, 0)
554
555            if db and cbb_bytes >= cbb_max_bytes:
556                db.close()
557                db = None
558                cbb += 1
559                cbb_bytes = 0
560                db_dir = None
561
562            if not db:
563                rv, db, db_dir = self.create_db(cbb)
564                if rv != 0:
565                    return self.future_done(future, rv)
566
567                meta_file = os.path.join(db_dir, "meta.json")
568                json_file = open(meta_file, "w")
569                json.dump({'pred': dep}, json_file, ensure_ascii=False)
570                json_file.close()
571
572            if (self.bucket_name(), self.node_name()) in self.cur['failoverlog']:
573                BFD.write_json_file(db_dir,
574                                    "failover.json",
575                                    self.cur['failoverlog'][(self.bucket_name(), self.node_name())])
576            if (self.bucket_name(), self.node_name()) in self.cur['snapshot']:
577                BFD.write_json_file(db_dir,
578                                    "snapshot_markers.json",
579                                    self.cur['snapshot'][(self.bucket_name(), self.node_name())])
580            try:
581                c = db.cursor()
582
583                for msg in batch.msgs:
584                    cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta = msg
585                    if self.skip(key, vbucket_id):
586                        continue
587                    if cmd not in [couchbaseConstants.CMD_TAP_MUTATION,
588                                   couchbaseConstants.CMD_TAP_DELETE,
589                                   couchbaseConstants.CMD_DCP_MUTATION,
590                                   couchbaseConstants.CMD_DCP_DELETE]:
591                        if db:
592                            db.close()
593                        return self.future_done(future,
594                                                "error: BFDSink bad cmd: " +
595                                                str(cmd))
596                    c.execute(s, (cmd, vbucket_id,
597                                  sqlite3.Binary(key),
598                                  flg, exp, str(cas),
599                                  sqlite3.Binary(str(meta)),
600                                  sqlite3.Binary(val),
601                                  seqno,
602                                  dtype,
603                                  nmeta))
604                    cbb_bytes += len(val)
605                    if seqno_map[vbucket_id] < seqno:
606                        seqno_map[vbucket_id] = seqno
607                db.commit()
608                BFD.write_json_file(db_dir, "seqno.json", seqno_map)
609                self.future_done(future, 0) # No return to keep looping.
610
611            except sqlite3.Error, e:
612                return self.future_done(future, "error: db error: " + str(e))
613            except Exception, e:
614                return self.future_done(future, "error: db exception: " + str(e))
615
616    @staticmethod
617    def can_handle(opts, spec):
618        spec = os.path.normpath(spec)
619        return (os.path.isdir(spec) or (not os.path.exists(spec) and
620                                        os.path.isdir(os.path.dirname(spec))))
621
622    @staticmethod
623    def check(opts, spec, source_map):
624        # TODO: (2) BFDSink - check disk space.
625        # TODO: (2) BFDSink - check should report on pre-creatable directories.
626
627        spec = os.path.normpath(spec)
628
629        # Check that directory's empty.
630        if os.path.exists(spec):
631            if not os.path.isdir(spec):
632                return "error: backup directory is not a directory: " + spec, None
633            if not os.access(spec, os.W_OK):
634                return "error: backup directory is not writable: " + spec, None
635            return 0, None
636
637        # Or, that the parent directory exists.
638        parent_dir = os.path.dirname(spec)
639
640        if not os.path.exists(parent_dir):
641            return "error: missing parent directory: " + parent_dir, None
642        if not os.path.isdir(parent_dir):
643            return "error: parent directory is not a directory: " + parent_dir, None
644        if not os.access(parent_dir, os.W_OK):
645            return "error: parent directory is not writable: " + parent_dir, None
646        return 0, None
647
648    @staticmethod
649    def consume_design(opts, sink_spec, sink_map,
650                       source_bucket, source_map, source_design):
651        if source_design:
652            fname = BFD.design_path(sink_spec,
653                                    source_bucket['name'])
654            try:
655                rv = pump.mkdirs(fname)
656                if rv:
657                    return rv, None
658                f = open(fname, 'w')
659                f.write(source_design)
660                f.close()
661            except IOError, e:
662                return ("error: could not write design: %s" +
663                        "; exception: %s") % (fname, e), None
664        return 0
665
666    def consume_batch_async(self, batch):
667        return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
668
669    def create_db(self, num):
670        rv, dir = self.mkdirs()
671        if rv != 0:
672            return rv, None, None
673
674        path = dir + "/data-%s.cbb" % (string.rjust(str(num), 4, '0'))
675        rv, db = create_db(path, self.opts)
676        if rv != 0:
677            return rv, None, None
678
679        try:
680            import copy
681            tmp_map = copy.deepcopy(self.source_map)
682            if 'spec_parts' in tmp_map:
683                del tmp_map['spec_parts']
684            cur = db.cursor()
685            cur.execute("INSERT INTO cbb_meta (key, val) VALUES (?, ?)",
686                        ("source_bucket.json",
687                         json.dumps(cleanse(self.source_bucket))))
688            cur.execute("INSERT INTO cbb_meta (key, val) VALUES (?, ?)",
689                        ("source_node.json",
690                         json.dumps(cleanse(tmp_map))))
691            cur.execute("INSERT INTO cbb_meta (key, val) VALUES (?, ?)",
692                        ("source_map.json",
693                         json.dumps(cleanse(self.source_map))))
694            cur.execute("INSERT INTO cbb_meta (key, val) VALUES (?, ?)",
695                        ("start.datetime", time.strftime("%Y/%m/%d-%H:%M:%S")))
696            db.commit()
697        except sqlite3.Error, e:
698            return "error: create_db error: " + str(e), None, None
699        except Exception, e:
700            return "error: create_db exception: " + str(e), None, None
701
702        return 0, db, dir
703
704    def mkdirs(self):
705        """Make directories, if not already, with structure like...
706           <spec>/
707             YYYY-MM-DDThhmmssZ/
708                YYYY-MM-DDThhmmssZ-full /
709                   bucket-<BUCKETNAME>/
710                     design.json
711                     node-<NODE>/
712                       data-<XXXX>.cbb
713                YYYY-MM-DDThhmmssZ-diff/
714                   bucket-<BUCKETNAME>/
715                     design.json
716                     node-<NODE>/
717                       data-<XXXX>.cbb
718                   """
719        """CBSE-1052: There appears to be a race condition in os.mkdir. Suppose
720           more than two threads simultaneously try to create the same directory
721           or different directories with a common non-existent ancestor. Both check
722           the directory doesn't exists, then both invoke os.mkdir. One of these
723           will throw OSError due to underlying EEXIST system error."""
724
725        spec = os.path.normpath(self.spec)
726        if not os.path.isdir(spec):
727            try:
728                os.mkdir(spec)
729            except OSError, e:
730                if not os.path.isdir(spec):
731                    return "error: could not mkdir: %s; exception: %s" % (spec, e)
732
733        new_session = self.ctl['new_session']
734        self.ctl['new_session'] = False
735        d = BFD.db_dir(self.spec,
736                       self.bucket_name(),
737                       self.node_name(),
738                       self.ctl['new_timestamp'],
739                       getattr(self.opts, "mode", "diff"),
740                       new_session)
741        if not os.path.isdir(d):
742            try:
743                os.makedirs(d)
744            except OSError, e:
745                if not os.path.isdir(d):
746                    return "error: could not mkdirs: %s; exception: %s" % (d, e), None
747        return 0, d
748
749
750# --------------------------------------------------
751
752def create_db(db_path, opts):
753    try:
754        logging.debug("  create_db: " + db_path)
755
756        rv, db, ver = connect_db(db_path, opts, [0])
757        if rv != 0:
758            logging.debug("fail to call connect_db:" + db_path)
759            return rv, None
760
761        # The cas column is type text, not integer, because sqlite
762        # integer is 63-bits instead of 64-bits.
763        db.executescript("""
764                  BEGIN;
765                  CREATE TABLE cbb_msg
766                     (cmd integer,
767                      vbucket_id integer,
768                      key blob,
769                      flg integer,
770                      exp integer,
771                      cas text,
772                      meta blob,
773                      val blob,
774                      seqno integer,
775                      dtype integer,
776                      meta_size integer);
777                  CREATE TABLE cbb_meta
778                     (key text,
779                      val blob);
780                  pragma user_version=%s;
781                  COMMIT;
782                """ % (CBB_VERSION[1]))
783
784        return 0, db
785
786    except Exception, e:
787        return "error: create_db exception: " + str(e), None
788
789def connect_db(db_path, opts, version):
790    try:
791        # TODO: (1) BFD - connect_db - use pragma page_size.
792        # TODO: (1) BFD - connect_db - use pragma max_page_count.
793        # TODO: (1) BFD - connect_db - use pragma journal_mode.
794
795        logging.debug("  connect_db: " + db_path)
796
797        db = sqlite3.connect(db_path)
798        db.text_factory = str
799
800        cur = db.execute("pragma user_version").fetchall()[0][0]
801        if cur not in version:
802            logging.debug("dbpath is not empty: " + db_path)
803            return "error: unexpected db user version: " + \
804                str(cur) + " vs " + str(version) + \
805                ", maybe a backup directory created by older releases is reused", \
806                None, None
807
808        return 0, db, version.index(cur)
809
810    except Exception, e:
811        return "error: connect_db exception: " + str(e), None
812
813def cleanse(d):
814    """Elide passwords from hierarchy of dict/list's."""
815    return cleanse_helper(copy.deepcopy(d))
816
817def cleanse_helper(d):
818    """Recursively, destructively elide passwords from hierarchy of dict/list's."""
819    if type(d) is list:
820        for x in d:
821            cleanse_helper(x)
822    elif type(d) is dict:
823        for k, v in d.iteritems():
824            if "assword" in k:
825                d[k] = '<...ELIDED...>'
826            else:
827                d[k] = cleanse_helper(v)
828    return d
829
830def recursive_glob(rootdir='.', pattern='*'):
831    return [os.path.join(rootdir, filename)
832            for rootdir, dirnames, filenames in os.walk(rootdir)
833            for filename in filenames
834            if fnmatch.fnmatch(filename, pattern)]
835
836def local_to_utc(t):
837    secs = time.mktime(t.timetuple())
838    return datetime.datetime.utcfromtimestamp(secs)
839