xref: /4.6.0/couchbase-cli/pump_bfd.py (revision 38e3ecf9)
1#!/usr/bin/env python
2
3import copy
4import glob
5import logging
6import os
7import json
8import string
9import sys
10import datetime
11import time
12import urllib
13import fnmatch
14
15import couchbaseConstants
16import pump
17
18import_stmts = (
19    'from pysqlite2 import dbapi2 as sqlite3',
20    'import sqlite3',
21)
22for status, stmt in enumerate(import_stmts):
23    try:
24        exec stmt
25        break
26    except ImportError:
27        status = None
28if status is None:
29    sys.exit("Error: could not import sqlite3 module")
30
31CBB_VERSION = [2004, 2014, 2015] # sqlite pragma user version.
32
33class BFD:
34    """Mixin for backup-file/directory EndPoint helper methods."""
35    NUM_VBUCKET = 1024
36
37    def bucket_name(self):
38        return self.source_bucket['name']
39
40    def node_name(self):
41        return self.source_node['hostname']
42
43    @staticmethod
44    def design_path(spec, bucket_name):
45        bucket_path = os.path.normpath(spec) + "/bucket-" + urllib.quote_plus(bucket_name).encode('ascii')
46        if os.path.isdir(bucket_path):
47            return bucket_path + '/design.json'
48        else:
49            path, dirs = BFD.find_latest_dir(spec, None)
50            if path:
51                path, dirs = BFD.find_latest_dir(path, None)
52                if path:
53                    return path + "/bucket-" + urllib.quote_plus(bucket_name).encode('ascii') + '/design.json'
54        return bucket_path + '/design.json'
55
56    @staticmethod
57    def index_path(spec, bucket_name):
58        bucket_path = os.path.normpath(spec) + "/bucket-" + urllib.quote_plus(bucket_name).encode('ascii')
59        if os.path.isdir(bucket_path):
60            return bucket_path + '/index.json'
61        else:
62            path, dirs = BFD.find_latest_dir(spec, None)
63            if path:
64                path, dirs = BFD.find_latest_dir(path, None)
65                if path:
66                    return path + "/bucket-" + urllib.quote_plus(bucket_name).encode('ascii') + '/index.json'
67        return bucket_path + '/index.json'
68
69    @staticmethod
70    def construct_dir(parent, bucket_name, node_name):
71        return os.path.join(parent,
72                    "bucket-" + urllib.quote_plus(bucket_name).encode('ascii'),
73                    "node-" + urllib.quote_plus(node_name).encode('ascii'))
74
75    @staticmethod
76    def check_full_dbfiles(parent_dir):
77        return glob.glob(os.path.join(parent_dir, "data-*.cbb"))
78
79    @staticmethod
80    def get_predecessors(parent_dir):
81        try:
82            json_file = open(os.path.join(parent_dir, "meta.json"), "r")
83            json_data = json.load(json_file)
84            json_file.close()
85            return json_data["pred"]
86        except IOError:
87            return []
88
89    @staticmethod
90    def get_failover_log(parent_dir):
91        try:
92            filepath = os.path.join(parent_dir, "failover.json")
93            if os.path.isfile(filepath):
94                json_file = open(filepath, "r")
95                json_data = json.load(json_file)
96                json_file.close()
97                return json_data
98            else:
99                return {}
100        except IOError:
101            return {}
102
103    @staticmethod
104    def list_files(opts, spec, bucket, node, pattern):
105        file_list = []
106        prec_list = []
107
108    @staticmethod
109    def write_json_file(parent_dir, filename, output_data):
110        filepath = os.path.join(parent_dir, filename)
111        json_data = {}
112        if os.path.isfile(filepath):
113            #load into existed meta data generated from previous batch run
114            try:
115                json_file = open(filepath, "r")
116                json_data = json.load(json_file)
117                json_file.close()
118            except IOError:
119                pass
120
121        for i in range(BFD.NUM_VBUCKET):
122            if output_data.get(i):
123                str_index = str(i)
124                historic_data_exist = json_data.get(str_index)
125                if historic_data_exist:
126                    #Historic data will share same data type as incoming ones.
127                    #It will be sufficient to only check type for historic data
128                    if isinstance(json_data[str_index], int):
129                        #For seqno, we want to keep the highest seqno
130                        if json_data[str_index] < output_data[i]:
131                            json_data[str_index] = output_data[i]
132                    elif isinstance(json_data[str_index], list):
133                        #For each vbucket, we want to get the superset of its references
134                        if len(json_data[str_index]) < len(output_data[i]):
135                            json_data[str_index] = output_data[i]
136                else:
137                    #Bookkeeping the incoming one.
138                    json_data[str_index] = output_data[i]
139
140        json_file = open(filepath, "w")
141        json.dump(json_data, json_file, ensure_ascii=False)
142        json_file.close()
143
144    @staticmethod
145    def db_dir(spec, bucket_name, node_name, tmstamp=None, mode=None, new_session=False):
146        parent_dir = os.path.normpath(spec) + \
147                        '/bucket-' + urllib.quote_plus(bucket_name).encode('ascii') + \
148                        '/node-' + urllib.quote_plus(node_name).encode('ascii')
149        if os.path.isdir(parent_dir):
150            return parent_dir
151
152        #check 3.0 directory structure
153        if not tmstamp:
154            tmstamp = time.strftime("%Y-%m-%dT%H%M%SZ", time.gmtime())
155        parent_dir = os.path.normpath(spec)
156        rootpath, dirs = BFD.find_latest_dir(parent_dir, None)
157        if not rootpath or not mode or mode == "full":
158            # no any backup roots exists
159            path = os.path.join(parent_dir, tmstamp, tmstamp+"-full")
160            return BFD.construct_dir(path, bucket_name, node_name)
161
162        #check if any full backup exists
163        path, dirs = BFD.find_latest_dir(rootpath, "full")
164        if not path:
165            path = os.path.join(rootpath, tmstamp+"-full")
166            return BFD.construct_dir(path, bucket_name, node_name)
167        else:
168            #further check full backup for this bucket and node
169            path = BFD.construct_dir(path, bucket_name, node_name)
170            if not os.path.isdir(path):
171                return path
172
173        if mode.find("diff") >= 0:
174            path, dirs = BFD.find_latest_dir(rootpath, "diff")
175            if not path or new_session:
176                path = os.path.join(rootpath, tmstamp+"-diff")
177                return BFD.construct_dir(path, bucket_name, node_name)
178            else:
179                path = BFD.construct_dir(path, bucket_name, node_name)
180                if not os.path.isdir(path):
181                    return path
182                else:
183                    path = os.path.join(rootpath, tmstamp+"-diff")
184                    return BFD.construct_dir(path, bucket_name, node_name)
185        elif mode.find("accu") >= 0:
186            path, dirs = BFD.find_latest_dir(rootpath, "accu")
187            if not path or new_session:
188                path = os.path.join(rootpath, tmstamp+"-accu")
189                return BFD.construct_dir(path, bucket_name, node_name)
190            else:
191                path = BFD.construct_dir(path, bucket_name, node_name)
192                if not os.path.isdir(path):
193                    return path
194                else:
195                    path = os.path.join(rootpath, tmstamp+"-accu")
196                    return BFD.construct_dir(path, bucket_name, node_name)
197        else:
198            return parent_dir
199
200    @staticmethod
201    def find_latest_dir(parent_dir, mode):
202        all_subdirs = []
203        latest_dir = None
204        if not os.path.isdir(parent_dir):
205            return latest_dir, all_subdirs
206        for d in os.listdir(parent_dir):
207            if not mode or d.find(mode) >= 0:
208                bd = os.path.join(parent_dir, d)
209                if os.path.isdir(bd):
210                    all_subdirs.append(bd)
211        if all_subdirs:
212            all_subdirs = sorted(all_subdirs,key=os.path.getmtime, reverse=True)
213            latest_dir = all_subdirs[0]
214        return latest_dir, all_subdirs
215
216    @staticmethod
217    def find_seqno(opts, spec, bucket_name, node_name, mode):
218        seqno = {}
219        dep = {}
220        dep_list = []
221        failover_log = {}
222        snapshot_markers = {}
223        for i in range(BFD.NUM_VBUCKET):
224            seqno[str(i)] = 0
225            dep[i] = None
226            failover_log[i] = None
227            snapshot_markers[i] = None
228
229        file_list = []
230        failoverlog_list = []
231        snapshot_list = []
232        seqno_list = []
233        parent_dir = os.path.normpath(spec)
234
235        if mode == "full":
236            return seqno, dep_list, failover_log, snapshot_markers
237        timedir,latest_dirs = BFD.find_latest_dir(parent_dir, None)
238        if not timedir:
239            return seqno, dep_list, failover_log, snapshot_markers
240        fulldir, latest_dirs = BFD.find_latest_dir(timedir, "full")
241        if not fulldir:
242            return seqno, dep_list, failover_log, snapshot_markers
243
244        path = BFD.construct_dir(fulldir, bucket_name, node_name)
245        if not os.path.isdir(path):
246            return seqno, dep_list, failover_log, snapshot_markers
247
248        file_list.extend(recursive_glob(path, 'data-*.cbb'))
249        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
250        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
251        seqno_list.extend(recursive_glob(path, 'seqno.json'))
252
253        accudir, accu_dirs = BFD.find_latest_dir(timedir, "accu")
254        if accudir:
255            path = BFD.construct_dir(accudir, bucket_name, node_name)
256            if os.path.isdir(path):
257                file_list.extend(recursive_glob(path, 'data-*.cbb'))
258                failoverlog_list.extend(recursive_glob(path, 'failover.json'))
259                snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
260                seqno_list.extend(recursive_glob(path, 'seqno.json'))
261        if mode.find("diff") >= 0:
262            diffdir, diff_dirs = BFD.find_latest_dir(timedir, "diff")
263            if diff_dirs:
264                for dir in diff_dirs:
265                    path = BFD.construct_dir(dir, bucket_name, node_name)
266                    if os.path.isdir(path):
267                        file_list.extend(recursive_glob(path, 'data-*.cbb'))
268                        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
269                        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
270                        seqno_list.extend(recursive_glob(path, 'seqno.json'))
271
272        for x in sorted(seqno_list):
273            try:
274                json_file = open(x, "r")
275                json_data = json.load(json_file)
276                json_file.close()
277
278                for vbid, seq in json_data.iteritems():
279                    if not seq:
280                        continue
281                    if seqno.get(vbid) < seq:
282                        seqno[vbid] = seq
283            except IOError:
284                pass
285
286        for log_file in sorted(failoverlog_list):
287            try:
288                json_file = open(log_file, "r")
289                json_data = json.load(json_file)
290                json_file.close()
291
292                for vbid, flogs in json_data.iteritems():
293                    if not flogs:
294                        continue
295                    elif vbid not in failover_log.keys():
296                        failover_log[vbid] = flogs
297                    else:
298                        for logpair in flogs:
299                            if not failover_log[vbid]:
300                                failover_log[vbid] = [logpair]
301                            elif logpair not in failover_log[vbid]:
302                                failover_log[vbid].append(logpair)
303            except IOError:
304                pass
305
306        for snapshot in sorted(snapshot_list):
307            try:
308                json_file = open(snapshot, "r")
309                json_data = json.load(json_file)
310                json_file.close()
311
312                for vbid, markers in json_data.iteritems():
313                    snapshot_markers[vbid] = markers
314            except IOError:
315                pass
316
317        for i in range(BFD.NUM_VBUCKET):
318            if dep[i] and dep[i] not in dep_list:
319                dep_list.append(dep[i])
320        return seqno, dep_list, failover_log, snapshot_markers
321
322# --------------------------------------------------
323
324class BFDSource(BFD, pump.Source):
325    """Can read from backup-file/directory layout."""
326
327    def __init__(self, opts, spec, source_bucket, source_node,
328                 source_map, sink_map, ctl, cur):
329        super(BFDSource, self).__init__(opts, spec, source_bucket, source_node,
330                                        source_map, sink_map, ctl, cur)
331        self.done = False
332        self.files = None
333        self.cursor_db = None
334
335    @staticmethod
336    def can_handle(opts, spec):
337        if os.path.isdir(spec):
338            dirs = glob.glob(spec + "/bucket-*/node-*/data-*.cbb")
339            if dirs:
340                return True
341            path, dirs = BFD.find_latest_dir(spec, None)
342            if path:
343                path, dirs = BFD.find_latest_dir(path, "full")
344                if path:
345                    return glob.glob(path + "*/bucket-*/node-*/data-*.cbb")
346        return False
347
348    @staticmethod
349    def check(opts, spec):
350        spec = os.path.normpath(spec)
351        if not os.path.isdir(spec):
352            return "error: backup_dir is not a directory: " + spec, None
353
354        buckets = []
355
356        bucket_dirs = glob.glob(spec + "/bucket-*")
357        if not bucket_dirs:
358            #check 3.0 directory structure
359            path, dirs = BFD.find_latest_dir(spec, None)
360            if not path:
361                return "error: no backup directory found: " + spec, None
362            latest_dir, dir = BFD.find_latest_dir(path, "full")
363            if not latest_dir:
364                return "error: no valid backup directory found: " + path, None
365            bucket_dirs = glob.glob(latest_dir + "/bucket-*")
366
367        for bucket_dir in sorted(bucket_dirs):
368            if not os.path.isdir(bucket_dir):
369                return "error: not a bucket directory: " + bucket_dir, None
370
371            bucket_name = os.path.basename(bucket_dir)[len("bucket-"):].strip()
372            bucket_name = urllib.unquote_plus(bucket_name).encode('ascii')
373            if not bucket_name:
374                return "error: bucket_name too short: " + bucket_dir, None
375
376            bucket = { 'name': bucket_name, 'nodes': [] }
377            buckets.append(bucket)
378
379            node_dirs = glob.glob(bucket_dir + "/node-*")
380            for node_dir in sorted(node_dirs):
381                if not os.path.isdir(node_dir):
382                    return "error: not a node directory: " + node_dir, None
383
384                node_name = os.path.basename(node_dir)[len("node-"):].strip()
385                node_name = urllib.unquote_plus(node_name).encode('ascii')
386                if not node_name:
387                    return "error: node_name too short: " + node_dir, None
388
389                bucket['nodes'].append({ 'hostname': node_name })
390
391        return 0, { 'spec': spec,
392                    'buckets': buckets }
393
394    @staticmethod
395    def provide_design(opts, source_spec, source_bucket, source_map):
396        fname = BFD.design_path(source_spec, source_bucket['name'])
397        if os.path.exists(fname):
398            try:
399                f = open(fname, 'r')
400                d = f.read()
401                f.close()
402                return 0, d
403            except IOError, e:
404                return ("error: could not read design: %s" +
405                        "; exception: %s") % (fname, e), None
406        return 0, None
407
408    @staticmethod
409    def provide_index(opts, source_spec, source_bucket, source_map):
410        fname = BFD.index_path(source_spec, source_bucket['name'])
411        if os.path.exists(fname):
412            try:
413                f = open(fname, 'r')
414                d = f.read()
415                f.close()
416                return 0, d
417            except IOError, e:
418                return ("error: could not read design: %s" +
419                        "; exception: %s") % (fname, e), None
420        return 0, None
421
422    def get_conflict_resolution_type(self):
423        rv, files = BFDSource.list_files(self.opts, self.source_map['spec'],
424                                         self.source_bucket['name'],
425                                         self.source_node['hostname'],
426                                         "meta.json")
427        if rv != 0:
428            return "seqno"
429        try:
430            json_file = open(files[0], "r")
431            json_data = json.load(json_file)
432            json_file.close()
433            if "conflict_resolution_type" in json_data:
434                return json_data["conflict_resolution_type"]
435            return "seqno"
436        except IOError:
437            return "seqno"
438
439    def provide_batch(self):
440        if self.done:
441            return 0, None
442
443        batch = pump.Batch(self)
444
445        batch_max_size = self.opts.extra['batch_max_size']
446        batch_max_bytes = self.opts.extra['batch_max_bytes']
447
448        s = ["SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val FROM cbb_msg",
449             "SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, meta_size FROM cbb_msg",
450             "SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, meta_size, conf_res FROM cbb_msg"]
451
452        if self.files is None: # None != [], as self.files will shrink to [].
453            g =  glob.glob(BFD.db_dir(self.spec, self.bucket_name(), self.node_name()) + "/data-*.cbb")
454            if not g:
455                #check 3.0 file structure
456                rv, file_list = BFDSource.list_files(self.opts,
457                                                     self.spec,
458                                                     self.bucket_name(),
459                                                     self.node_name(),
460                                                     "data-*.cbb")
461                if rv != 0:
462                    return rv, None
463                from_date = getattr(self.opts, "from_date", None)
464                if from_date:
465                    from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d")
466
467                to_date = getattr(self.opts, "to_date", None)
468                if to_date:
469                    to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d")
470                g = []
471                for f in file_list:
472                    mtime = datetime.datetime.fromtimestamp(os.path.getmtime(f))
473                    if (not from_date or mtime >= from_date) and (not to_date or mtime <= to_date):
474                        g.append(f)
475            self.files = sorted(g)
476        try:
477            ver = 0
478            while (not self.done and
479                   batch.size() < batch_max_size and
480                   batch.bytes < batch_max_bytes):
481                if self.cursor_db is None:
482                    if not self.files:
483                        self.done = True
484                        return 0, batch
485
486                    rv, db, ver = connect_db(self.files[0], self.opts, CBB_VERSION)
487                    if rv != 0:
488                        return rv, None
489                    self.files = self.files[1:]
490
491                    cursor = db.cursor()
492                    cursor.execute(s[ver])
493
494                    self.cursor_db = (cursor, db)
495
496                cursor, db = self.cursor_db
497
498                row = cursor.fetchone()
499                if row:
500                    vbucket_id = row[1]
501                    key = row[2]
502                    val = row[7]
503
504                    if self.skip(key, vbucket_id):
505                        continue
506                    msg = (row[0], row[1], row[2], row[3], row[4],
507                           int(row[5]), # CAS as 64-bit integer not string.
508                           row[6], # revid as 64-bit integer too
509                           row[7])
510                    if ver == 2:
511                        msg = msg + (row[8], row[9], row[10], row[11])
512                    elif ver == 1:
513                        msg = msg + (row[8], row[9], row[10], 0)
514                    else:
515                        msg = msg + (0, 0, 0, 0)
516                    batch.append(msg, len(val))
517                else:
518                    if self.cursor_db:
519                        self.cursor_db[0].close()
520                        self.cursor_db[1].close()
521                    self.cursor_db = None
522
523            return 0, batch
524
525        except Exception, e:
526            self.done = True
527            if self.cursor_db:
528                self.cursor_db[0].close()
529                self.cursor_db[1].close()
530            self.cursor_db = None
531
532            return "error: exception reading backup file: " + str(e), None
533
534    @staticmethod
535    def total_msgs(opts, source_bucket, source_node, source_map):
536        t = 0
537        file_list = glob.glob(BFD.db_dir(
538                                 source_map['spec'],
539                                 source_bucket['name'],
540                                 source_node['hostname']) + "/data-*.cbb")
541        if not file_list:
542            #check 3.0 directory structure
543            rv, file_list = BFDSource.list_files(opts,
544                                        source_map['spec'],
545                                        source_bucket['name'],
546                                        source_node['hostname'],
547                                        "data-*.cbb")
548            if rv != 0:
549                return rv, None
550
551        for x in sorted(file_list):
552            rv, db, ver = connect_db(x, opts, CBB_VERSION)
553            if rv != 0:
554                return rv, None
555
556            cur = db.cursor()
557            # TODO: (1) BFDSource - COUNT(*) is not indexed.
558            cur.execute("SELECT COUNT(*) FROM cbb_msg;")
559            t = t + cur.fetchone()[0]
560            cur.close()
561            db.close()
562
563        return 0, t
564
565    @staticmethod
566    def list_files(opts, spec, bucket, node, pattern):
567        file_list = []
568        prec_list = []
569        path, dirs = BFD.find_latest_dir(spec, None)
570        if not path:
571            return "error: No valid data in path:" % spec, None
572
573        path, dirs = BFD.find_latest_dir(path, None)
574        if not path:
575            return 0, file_list
576
577        for dir in dirs:
578            latest_dir = BFD.construct_dir(dir, bucket, node)
579            file_list.extend(glob.glob(os.path.join(latest_dir, pattern)))
580            for p in BFD.get_predecessors(latest_dir):
581                prec_list.append(os.path.dirname(p))
582            if len(prec_list) > 0:
583                break
584
585        while len(prec_list) > 0:
586            deps = glob.glob(os.path.join(prec_list[0], pattern))
587            for d in glob.glob(os.path.join(prec_list[0], pattern)):
588                if d not in file_list:
589                    file_list.append(d)
590            for p in BFD.get_predecessors(prec_list[0]):
591                dirname = os.path.dirname(p)
592                if dirname not in prec_list:
593                    prec_list.append(dirname)
594            prec_list = prec_list[1:]
595
596        return 0, file_list
597
598# --------------------------------------------------
599
600class BFDSink(BFD, pump.Sink):
601    """Can write to backup-file/directory layout."""
602
603    def __init__(self, opts, spec, source_bucket, source_node,
604                 source_map, sink_map, ctl, cur):
605        super(BFDSink, self).__init__(opts, spec, source_bucket, source_node,
606                                      source_map, sink_map, ctl, cur)
607        self.mode = "full"
608        self.init_worker(BFDSink.run)
609
610    @staticmethod
611    def run(self):
612        """Worker thread to asynchronously store incoming batches into db."""
613        s = "INSERT INTO cbb_msg (cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, \
614            dtype, meta_size, conf_res) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
615        db = None
616        cbb = 0       # Current cbb file NUM, like data-NUM.cbb.
617        cbb_bytes = 0 # Current cbb msg value bytes total.
618        db_dir = None
619        cbb_max_bytes = \
620            self.opts.extra.get("cbb_max_mb", 100000) * 1024 * 1024
621        _, dep, _, _ = BFD.find_seqno(self.opts,
622                                                  self.spec,
623                                                  self.bucket_name(),
624                                                  self.node_name(),
625                                                  self.mode)
626
627        confResType = "seqno"
628        if "conflictResolutionType" in self.source_bucket:
629            confResType = self.source_bucket["conflictResolutionType"]
630
631        seqno_map = {}
632        for i in range(BFD.NUM_VBUCKET):
633            seqno_map[i] = 0
634        while not self.ctl['stop']:
635            if not db:
636                rv, db, db_dir = self.create_db(cbb)
637                if rv != 0:
638                    return self.future_done(future, rv)
639
640                meta_file = os.path.join(db_dir, "meta.json")
641                json_file = open(meta_file, "w")
642                toWrite = {'pred': dep, 'conflict_resolution_type': confResType}
643                json.dump(toWrite, json_file, ensure_ascii=False)
644                json_file.close()
645
646            batch, future = self.pull_next_batch()
647            if not batch:
648                if db:
649                    db.close()
650                return self.future_done(future, 0)
651
652            if db and cbb_bytes >= cbb_max_bytes:
653                db.close()
654                db = None
655                cbb += 1
656                cbb_bytes = 0
657                db_dir = None
658
659            if (self.bucket_name(), self.node_name()) in self.cur['failoverlog']:
660                BFD.write_json_file(db_dir,
661                                    "failover.json",
662                                    self.cur['failoverlog'][(self.bucket_name(), self.node_name())])
663            if (self.bucket_name(), self.node_name()) in self.cur['snapshot']:
664                BFD.write_json_file(db_dir,
665                                    "snapshot_markers.json",
666                                    self.cur['snapshot'][(self.bucket_name(), self.node_name())])
667            try:
668                c = db.cursor()
669
670                for msg in batch.msgs:
671                    cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta, conf_res = msg
672                    if self.skip(key, vbucket_id):
673                        continue
674                    if cmd not in [couchbaseConstants.CMD_TAP_MUTATION,
675                                   couchbaseConstants.CMD_TAP_DELETE,
676                                   couchbaseConstants.CMD_DCP_MUTATION,
677                                   couchbaseConstants.CMD_DCP_DELETE]:
678                        if db:
679                            db.close()
680                        return self.future_done(future,
681                                                "error: BFDSink bad cmd: " +
682                                                str(cmd))
683                    c.execute(s, (cmd, vbucket_id,
684                                  sqlite3.Binary(key),
685                                  flg, exp, str(cas),
686                                  sqlite3.Binary(str(meta)),
687                                  sqlite3.Binary(val),
688                                  seqno,
689                                  dtype,
690                                  nmeta,
691                                  conf_res))
692                    cbb_bytes += len(val)
693                    if seqno_map[vbucket_id] < seqno:
694                        seqno_map[vbucket_id] = seqno
695                db.commit()
696                BFD.write_json_file(db_dir, "seqno.json", seqno_map)
697                self.future_done(future, 0) # No return to keep looping.
698
699            except sqlite3.Error, e:
700                return self.future_done(future, "error: db error: " + str(e))
701            except Exception, e:
702                return self.future_done(future, "error: db exception: " + str(e))
703
704    @staticmethod
705    def can_handle(opts, spec):
706        spec = os.path.normpath(spec)
707        return (os.path.isdir(spec) or (not os.path.exists(spec) and
708                                        os.path.isdir(os.path.dirname(spec))))
709
710    @staticmethod
711    def check(opts, spec, source_map):
712        # TODO: (2) BFDSink - check disk space.
713        # TODO: (2) BFDSink - check should report on pre-creatable directories.
714
715        spec = os.path.normpath(spec)
716
717        # Check that directory's empty.
718        if os.path.exists(spec):
719            if not os.path.isdir(spec):
720                return "error: backup directory is not a directory: " + spec, None
721            if not os.access(spec, os.W_OK):
722                return "error: backup directory is not writable: " + spec, None
723            return 0, None
724
725        # Or, that the parent directory exists.
726        parent_dir = os.path.dirname(spec)
727
728        if not os.path.exists(parent_dir):
729            return "error: missing parent directory: " + parent_dir, None
730        if not os.path.isdir(parent_dir):
731            return "error: parent directory is not a directory: " + parent_dir, None
732        if not os.access(parent_dir, os.W_OK):
733            return "error: parent directory is not writable: " + parent_dir, None
734        return 0, None
735
736    @staticmethod
737    def consume_design(opts, sink_spec, sink_map,
738                       source_bucket, source_map, source_design):
739        if source_design:
740            fname = BFD.design_path(sink_spec,
741                                    source_bucket['name'])
742            try:
743                rv = pump.mkdirs(fname)
744                if rv:
745                    return rv, None
746                f = open(fname, 'w')
747                f.write(source_design)
748                f.close()
749            except IOError, e:
750                return ("error: could not write design: %s" +
751                        "; exception: %s") % (fname, e), None
752        return 0
753
754    @staticmethod
755    def consume_index(opts, sink_spec, sink_map,
756                       source_bucket, source_map, source_design):
757        if source_design:
758            fname = BFD.index_path(sink_spec,
759                                    source_bucket['name'])
760            try:
761                rv = pump.mkdirs(fname)
762                if rv:
763                    return rv, None
764                f = open(fname, 'w')
765                f.write(source_design)
766                f.close()
767            except IOError, e:
768                return ("error: could not write index: %s" +
769                        "; exception: %s") % (fname, e), None
770        return 0
771
772    def consume_batch_async(self, batch):
773        return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
774
775    def create_db(self, num):
776        rv, dir = self.mkdirs()
777        if rv != 0:
778            return rv, None, None
779
780        path = dir + "/data-%s.cbb" % (string.rjust(str(num), 4, '0'))
781        rv, db = create_db(path, self.opts)
782        if rv != 0:
783            return rv, None, None
784
785        return 0, db, dir
786
787    def mkdirs(self):
788        """Make directories, if not already, with structure like...
789           <spec>/
790             YYYY-MM-DDThhmmssZ/
791                YYYY-MM-DDThhmmssZ-full /
792                   bucket-<BUCKETNAME>/
793                     design.json
794                     node-<NODE>/
795                       data-<XXXX>.cbb
796                YYYY-MM-DDThhmmssZ-diff/
797                   bucket-<BUCKETNAME>/
798                     design.json
799                     node-<NODE>/
800                       data-<XXXX>.cbb
801                   """
802        """CBSE-1052: There appears to be a race condition in os.mkdir. Suppose
803           more than two threads simultaneously try to create the same directory
804           or different directories with a common non-existent ancestor. Both check
805           the directory doesn't exists, then both invoke os.mkdir. One of these
806           will throw OSError due to underlying EEXIST system error."""
807
808        spec = os.path.normpath(self.spec)
809        if not os.path.isdir(spec):
810            try:
811                os.mkdir(spec)
812            except OSError as error:
813                if error.errno != errno.EEXIST:
814                    return "error: could not mkdir: %s; exception: %s" % (spec, error.strerror)
815
816        new_session = self.ctl['new_session']
817        self.ctl['new_session'] = False
818        d = BFD.db_dir(self.spec,
819                       self.bucket_name(),
820                       self.node_name(),
821                       self.ctl['new_timestamp'],
822                       getattr(self.opts, "mode", "diff"),
823                       new_session)
824        if not os.path.isdir(d):
825            try:
826                os.makedirs(d)
827            except OSError, e:
828                if not os.path.isdir(d):
829                    return "error: could not mkdirs: %s; exception: %s" % (d, e), None
830        return 0, d
831
832
833# --------------------------------------------------
834
835def create_db(db_path, opts):
836    try:
837        logging.debug("  create_db: " + db_path)
838
839        rv, db, ver = connect_db(db_path, opts, [0])
840        if rv != 0:
841            logging.debug("fail to call connect_db:" + db_path)
842            return rv, None
843
844        # The cas column is type text, not integer, because sqlite
845        # integer is 63-bits instead of 64-bits.
846        db.executescript("""
847                  BEGIN;
848                  CREATE TABLE cbb_msg
849                     (cmd integer,
850                      vbucket_id integer,
851                      key blob,
852                      flg integer,
853                      exp integer,
854                      cas text,
855                      meta blob,
856                      val blob,
857                      seqno integer,
858                      dtype integer,
859                      meta_size integer,
860                      conf_res integer);
861                  pragma user_version=%s;
862                  COMMIT;
863                """ % (CBB_VERSION[2]))
864
865        return 0, db
866
867    except Exception, e:
868        return "error: create_db exception: " + str(e), None
869
870def connect_db(db_path, opts, version):
871    try:
872        # TODO: (1) BFD - connect_db - use pragma page_size.
873        # TODO: (1) BFD - connect_db - use pragma max_page_count.
874        # TODO: (1) BFD - connect_db - use pragma journal_mode.
875
876        logging.debug("  connect_db: " + db_path)
877
878        db = sqlite3.connect(db_path)
879        db.text_factory = str
880
881        cur = db.execute("pragma user_version").fetchall()[0][0]
882        if cur not in version:
883            logging.debug("dbpath is not empty: " + db_path)
884            return "error: unexpected db user version: " + \
885                str(cur) + " vs " + str(version) + \
886                ", maybe a backup directory created by older releases is reused", \
887                None, None
888
889        return 0, db, version.index(cur)
890
891    except Exception, e:
892        return "error: connect_db exception: " + str(e), None
893
894def cleanse(d):
895    """Elide passwords from hierarchy of dict/list's."""
896    return cleanse_helper(copy.deepcopy(d))
897
898def cleanse_helper(d):
899    """Recursively, destructively elide passwords from hierarchy of dict/list's."""
900    if type(d) is list:
901        for x in d:
902            cleanse_helper(x)
903    elif type(d) is dict:
904        for k, v in d.iteritems():
905            if "assword" in k:
906                d[k] = '<...ELIDED...>'
907            else:
908                d[k] = cleanse_helper(v)
909    return d
910
911def recursive_glob(rootdir='.', pattern='*'):
912    return [os.path.join(rootdir, filename)
913            for rootdir, dirnames, filenames in os.walk(rootdir)
914            for filename in filenames
915            if fnmatch.fnmatch(filename, pattern)]
916
917def local_to_utc(t):
918    secs = time.mktime(t.timetuple())
919    return datetime.datetime.utcfromtimestamp(secs)
920