xref: /4.6.0/couchbase-cli/pump_bfd.py (revision 326a629a)
1#!/usr/bin/env python
2
3import copy
4import glob
5import logging
6import os
7import json
8import string
9import sys
10import datetime
11import time
12import urllib
13import fnmatch
14
15import couchbaseConstants
16import pump
17
18import_stmts = (
19    'from pysqlite2 import dbapi2 as sqlite3',
20    'import sqlite3',
21)
22for status, stmt in enumerate(import_stmts):
23    try:
24        exec stmt
25        break
26    except ImportError:
27        status = None
28if status is None:
29    sys.exit("Error: could not import sqlite3 module")
30
31CBB_VERSION = [2004, 2014, 2015] # sqlite pragma user version.
32
33class BFD:
34    """Mixin for backup-file/directory EndPoint helper methods."""
35    NUM_VBUCKET = 1024
36
37    def bucket_name(self):
38        return self.source_bucket['name']
39
40    def node_name(self):
41        return self.source_node['hostname']
42
43    @staticmethod
44    def design_path(spec, bucket_name):
45        bucket_path = os.path.normpath(spec) + "/bucket-" + urllib.quote_plus(bucket_name).encode('ascii')
46        if os.path.isdir(bucket_path):
47            return bucket_path + '/design.json'
48        else:
49            path, dirs = BFD.find_latest_dir(spec, None)
50            if path:
51                path, dirs = BFD.find_latest_dir(path, None)
52                if path:
53                    return path + "/bucket-" + urllib.quote_plus(bucket_name).encode('ascii') + '/design.json'
54        return bucket_path + '/design.json'
55
56    @staticmethod
57    def index_path(spec, bucket_name):
58        bucket_path = os.path.normpath(spec) + "/bucket-" + urllib.quote_plus(bucket_name).encode('ascii')
59        if os.path.isdir(bucket_path):
60            return bucket_path + '/index.json'
61        else:
62            path, dirs = BFD.find_latest_dir(spec, None)
63            if path:
64                path, dirs = BFD.find_latest_dir(path, None)
65                if path:
66                    return path + "/bucket-" + urllib.quote_plus(bucket_name).encode('ascii') + '/index.json'
67        return bucket_path + '/index.json'
68
69    @staticmethod
70    def construct_dir(parent, bucket_name, node_name):
71        return os.path.join(parent,
72                    "bucket-" + urllib.quote_plus(bucket_name).encode('ascii'),
73                    "node-" + urllib.quote_plus(node_name).encode('ascii'))
74
75    @staticmethod
76    def check_full_dbfiles(parent_dir):
77        return glob.glob(os.path.join(parent_dir, "data-*.cbb"))
78
79    @staticmethod
80    def get_predecessors(parent_dir):
81        try:
82            json_file = open(os.path.join(parent_dir, "meta.json"), "r")
83            json_data = json.load(json_file)
84            json_file.close()
85            return json_data["pred"]
86        except IOError:
87            return []
88
89    @staticmethod
90    def get_failover_log(parent_dir):
91        try:
92            filepath = os.path.join(parent_dir, "failover.json")
93            if os.path.isfile(filepath):
94                json_file = open(filepath, "r")
95                json_data = json.load(json_file)
96                json_file.close()
97                return json_data
98            else:
99                return {}
100        except IOError:
101            return {}
102
103    @staticmethod
104    def list_files(opts, spec, bucket, node, pattern):
105        file_list = []
106        prec_list = []
107
108    @staticmethod
109    def write_json_file(parent_dir, filename, output_data):
110        filepath = os.path.join(parent_dir, filename)
111        json_data = {}
112        if os.path.isfile(filepath):
113            #load into existed meta data generated from previous batch run
114            try:
115                json_file = open(filepath, "r")
116                json_data = json.load(json_file)
117                json_file.close()
118            except IOError:
119                pass
120
121        for i in range(BFD.NUM_VBUCKET):
122            if output_data.get(i):
123                str_index = str(i)
124                historic_data_exist = json_data.get(str_index)
125                if historic_data_exist:
126                    #Historic data will share same data type as incoming ones.
127                    #It will be sufficient to only check type for historic data
128                    if isinstance(json_data[str_index], int):
129                        #For seqno, we want to keep the highest seqno
130                        if json_data[str_index] < output_data[i]:
131                            json_data[str_index] = output_data[i]
132                    elif isinstance(json_data[str_index], list):
133                        #For each vbucket, we want to get the superset of its references
134                        if len(json_data[str_index]) < len(output_data[i]):
135                            json_data[str_index] = output_data[i]
136                else:
137                    #Bookkeeping the incoming one.
138                    json_data[str_index] = output_data[i]
139
140        json_file = open(filepath, "w")
141        json.dump(json_data, json_file, ensure_ascii=False)
142        json_file.close()
143
144    @staticmethod
145    def db_dir(spec, bucket_name, node_name, tmstamp=None, mode=None, new_session=False):
146        parent_dir = os.path.normpath(spec) + \
147                        '/bucket-' + urllib.quote_plus(bucket_name).encode('ascii') + \
148                        '/node-' + urllib.quote_plus(node_name).encode('ascii')
149        if os.path.isdir(parent_dir):
150            return parent_dir
151
152        #check 3.0 directory structure
153        if not tmstamp:
154            tmstamp = time.strftime("%Y-%m-%dT%H%M%SZ", time.gmtime())
155        parent_dir = os.path.normpath(spec)
156        rootpath, dirs = BFD.find_latest_dir(parent_dir, None)
157        if not rootpath or not mode or mode == "full":
158            # no any backup roots exists
159            path = os.path.join(parent_dir, tmstamp, tmstamp+"-full")
160            return BFD.construct_dir(path, bucket_name, node_name)
161
162        #check if any full backup exists
163        path, dirs = BFD.find_latest_dir(rootpath, "full")
164        if not path:
165            path = os.path.join(rootpath, tmstamp+"-full")
166            return BFD.construct_dir(path, bucket_name, node_name)
167        else:
168            #further check full backup for this bucket and node
169            path = BFD.construct_dir(path, bucket_name, node_name)
170            if not os.path.isdir(path):
171                return path
172
173        if mode.find("diff") >= 0:
174            path, dirs = BFD.find_latest_dir(rootpath, "diff")
175            if not path or new_session:
176                path = os.path.join(rootpath, tmstamp+"-diff")
177                return BFD.construct_dir(path, bucket_name, node_name)
178            else:
179                path = BFD.construct_dir(path, bucket_name, node_name)
180                if not os.path.isdir(path):
181                    return path
182                else:
183                    path = os.path.join(rootpath, tmstamp+"-diff")
184                    return BFD.construct_dir(path, bucket_name, node_name)
185        elif mode.find("accu") >= 0:
186            path, dirs = BFD.find_latest_dir(rootpath, "accu")
187            if not path or new_session:
188                path = os.path.join(rootpath, tmstamp+"-accu")
189                return BFD.construct_dir(path, bucket_name, node_name)
190            else:
191                path = BFD.construct_dir(path, bucket_name, node_name)
192                if not os.path.isdir(path):
193                    return path
194                else:
195                    path = os.path.join(rootpath, tmstamp+"-accu")
196                    return BFD.construct_dir(path, bucket_name, node_name)
197        else:
198            return parent_dir
199
200    @staticmethod
201    def find_latest_dir(parent_dir, mode):
202        all_subdirs = []
203        latest_dir = None
204        if not os.path.isdir(parent_dir):
205            return latest_dir, all_subdirs
206        for d in os.listdir(parent_dir):
207            if not mode or d.find(mode) >= 0:
208                bd = os.path.join(parent_dir, d)
209                if os.path.isdir(bd):
210                    all_subdirs.append(bd)
211        if all_subdirs:
212            all_subdirs = sorted(all_subdirs,key=os.path.getmtime, reverse=True)
213            latest_dir = all_subdirs[0]
214        return latest_dir, all_subdirs
215
216    @staticmethod
217    def find_seqno(opts, spec, bucket_name, node_name, mode):
218        seqno = {}
219        dep = {}
220        dep_list = []
221        failover_log = {}
222        snapshot_markers = {}
223        for i in range(BFD.NUM_VBUCKET):
224            seqno[str(i)] = 0
225            dep[i] = None
226            failover_log[i] = None
227            snapshot_markers[i] = None
228
229        file_list = []
230        failoverlog_list = []
231        snapshot_list = []
232        seqno_list = []
233        parent_dir = os.path.normpath(spec)
234
235        if mode == "full":
236            return seqno, dep_list, failover_log, snapshot_markers
237        timedir,latest_dirs = BFD.find_latest_dir(parent_dir, None)
238        if not timedir:
239            return seqno, dep_list, failover_log, snapshot_markers
240        fulldir, latest_dirs = BFD.find_latest_dir(timedir, "full")
241        if not fulldir:
242            return seqno, dep_list, failover_log, snapshot_markers
243
244        path = BFD.construct_dir(fulldir, bucket_name, node_name)
245        if not os.path.isdir(path):
246            return seqno, dep_list, failover_log, snapshot_markers
247
248        file_list.extend(recursive_glob(path, 'data-*.cbb'))
249        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
250        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
251        seqno_list.extend(recursive_glob(path, 'seqno.json'))
252
253        accudir, accu_dirs = BFD.find_latest_dir(timedir, "accu")
254        if accudir:
255            path = BFD.construct_dir(accudir, bucket_name, node_name)
256            if os.path.isdir(path):
257                file_list.extend(recursive_glob(path, 'data-*.cbb'))
258                failoverlog_list.extend(recursive_glob(path, 'failover.json'))
259                snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
260                seqno_list.extend(recursive_glob(path, 'seqno.json'))
261        if mode.find("diff") >= 0:
262            diffdir, diff_dirs = BFD.find_latest_dir(timedir, "diff")
263            if diff_dirs:
264                for dir in diff_dirs:
265                    path = BFD.construct_dir(dir, bucket_name, node_name)
266                    if os.path.isdir(path):
267                        file_list.extend(recursive_glob(path, 'data-*.cbb'))
268                        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
269                        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
270                        seqno_list.extend(recursive_glob(path, 'seqno.json'))
271
272        for x in sorted(seqno_list):
273            try:
274                json_file = open(x, "r")
275                json_data = json.load(json_file)
276                json_file.close()
277
278                for vbid, seq in json_data.iteritems():
279                    if not seq:
280                        continue
281                    if seqno.get(vbid) < seq:
282                        seqno[vbid] = seq
283            except IOError:
284                pass
285
286        for log_file in sorted(failoverlog_list):
287            try:
288                json_file = open(log_file, "r")
289                json_data = json.load(json_file)
290                json_file.close()
291
292                for vbid, flogs in json_data.iteritems():
293                    if not flogs:
294                        continue
295                    elif vbid not in failover_log.keys():
296                        failover_log[vbid] = flogs
297                    else:
298                        for logpair in flogs:
299                            if not failover_log[vbid]:
300                                failover_log[vbid] = [logpair]
301                            elif logpair not in failover_log[vbid]:
302                                failover_log[vbid].append(logpair)
303            except IOError:
304                pass
305
306        for snapshot in sorted(snapshot_list):
307            try:
308                json_file = open(snapshot, "r")
309                json_data = json.load(json_file)
310                json_file.close()
311
312                for vbid, markers in json_data.iteritems():
313                    snapshot_markers[vbid] = markers
314            except IOError:
315                pass
316
317        for i in range(BFD.NUM_VBUCKET):
318            if dep[i] and dep[i] not in dep_list:
319                dep_list.append(dep[i])
320        return seqno, dep_list, failover_log, snapshot_markers
321
322# --------------------------------------------------
323
324class BFDSource(BFD, pump.Source):
325    """Can read from backup-file/directory layout."""
326
327    def __init__(self, opts, spec, source_bucket, source_node,
328                 source_map, sink_map, ctl, cur):
329        super(BFDSource, self).__init__(opts, spec, source_bucket, source_node,
330                                        source_map, sink_map, ctl, cur)
331        self.done = False
332        self.files = None
333        self.cursor_db = None
334
335    @staticmethod
336    def can_handle(opts, spec):
337        if os.path.isdir(spec):
338            dirs = glob.glob(spec + "/bucket-*/node-*/data-*.cbb")
339            if dirs:
340                return True
341            path, dirs = BFD.find_latest_dir(spec, None)
342            if path:
343                path, dirs = BFD.find_latest_dir(path, "full")
344                if path:
345                    return glob.glob(path + "*/bucket-*/node-*/data-*.cbb")
346        return False
347
348    @staticmethod
349    def check(opts, spec):
350        spec = os.path.normpath(spec)
351        if not os.path.isdir(spec):
352            return "error: backup_dir is not a directory: " + spec, None
353
354        buckets = []
355
356        bucket_dirs = glob.glob(spec + "/bucket-*")
357        if not bucket_dirs:
358            #check 3.0 directory structure
359            path, dirs = BFD.find_latest_dir(spec, None)
360            if not path:
361                return "error: no backup directory found: " + spec, None
362            latest_dir, dir = BFD.find_latest_dir(path, "full")
363            if not latest_dir:
364                return "error: no valid backup directory found: " + path, None
365            bucket_dirs = glob.glob(latest_dir + "/bucket-*")
366
367        for bucket_dir in sorted(bucket_dirs):
368            if not os.path.isdir(bucket_dir):
369                return "error: not a bucket directory: " + bucket_dir, None
370
371            bucket_name = os.path.basename(bucket_dir)[len("bucket-"):].strip()
372            bucket_name = urllib.unquote_plus(bucket_name).encode('ascii')
373            if not bucket_name:
374                return "error: bucket_name too short: " + bucket_dir, None
375
376            bucket = { 'name': bucket_name, 'nodes': [] }
377            buckets.append(bucket)
378
379            node_dirs = glob.glob(bucket_dir + "/node-*")
380            for node_dir in sorted(node_dirs):
381                if not os.path.isdir(node_dir):
382                    return "error: not a node directory: " + node_dir, None
383
384                node_name = os.path.basename(node_dir)[len("node-"):].strip()
385                node_name = urllib.unquote_plus(node_name).encode('ascii')
386                if not node_name:
387                    return "error: node_name too short: " + node_dir, None
388
389                bucket['nodes'].append({ 'hostname': node_name })
390
391        return 0, { 'spec': spec,
392                    'buckets': buckets }
393
394    @staticmethod
395    def provide_design(opts, source_spec, source_bucket, source_map):
396        fname = BFD.design_path(source_spec, source_bucket['name'])
397        if os.path.exists(fname):
398            try:
399                f = open(fname, 'r')
400                d = f.read()
401                f.close()
402                return 0, d
403            except IOError, e:
404                return ("error: could not read design: %s" +
405                        "; exception: %s") % (fname, e), None
406        return 0, None
407
408    @staticmethod
409    def provide_index(opts, source_spec, source_bucket, source_map):
410        fname = BFD.index_path(source_spec, source_bucket['name'])
411        if os.path.exists(fname):
412            try:
413                f = open(fname, 'r')
414                d = f.read()
415                f.close()
416                return 0, d
417            except IOError, e:
418                return ("error: could not read design: %s" +
419                        "; exception: %s") % (fname, e), None
420        return 0, None
421
422    def provide_batch(self):
423        if self.done:
424            return 0, None
425
426        batch = pump.Batch(self)
427
428        batch_max_size = self.opts.extra['batch_max_size']
429        batch_max_bytes = self.opts.extra['batch_max_bytes']
430
431        s = ["SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val FROM cbb_msg",
432             "SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, meta_size FROM cbb_msg",
433             "SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, meta_size, conf_res FROM cbb_msg"]
434
435        if self.files is None: # None != [], as self.files will shrink to [].
436            g =  glob.glob(BFD.db_dir(self.spec, self.bucket_name(), self.node_name()) + "/data-*.cbb")
437            if not g:
438                #check 3.0 file structure
439                rv, file_list = BFDSource.list_files(self.opts,
440                                                     self.spec,
441                                                     self.bucket_name(),
442                                                     self.node_name(),
443                                                     "data-*.cbb")
444                if rv != 0:
445                    return rv, None
446                from_date = getattr(self.opts, "from_date", None)
447                if from_date:
448                    from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d")
449
450                to_date = getattr(self.opts, "to_date", None)
451                if to_date:
452                    to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d")
453                g = []
454                for f in file_list:
455                    mtime = datetime.datetime.fromtimestamp(os.path.getmtime(f))
456                    if (not from_date or mtime >= from_date) and (not to_date or mtime <= to_date):
457                        g.append(f)
458            self.files = sorted(g)
459        try:
460            ver = 0
461            while (not self.done and
462                   batch.size() < batch_max_size and
463                   batch.bytes < batch_max_bytes):
464                if self.cursor_db is None:
465                    if not self.files:
466                        self.done = True
467                        return 0, batch
468
469                    rv, db, ver = connect_db(self.files[0], self.opts, CBB_VERSION)
470                    if rv != 0:
471                        return rv, None
472                    self.files = self.files[1:]
473
474                    cursor = db.cursor()
475                    cursor.execute(s[ver])
476
477                    self.cursor_db = (cursor, db)
478
479                cursor, db = self.cursor_db
480
481                row = cursor.fetchone()
482                if row:
483                    vbucket_id = row[1]
484                    key = row[2]
485                    val = row[7]
486
487                    if self.skip(key, vbucket_id):
488                        continue
489                    msg = (row[0], row[1], row[2], row[3], row[4],
490                           int(row[5]), # CAS as 64-bit integer not string.
491                           row[6], # revid as 64-bit integer too
492                           row[7])
493                    if ver == 2:
494                        msg = msg + (row[8], row[9], row[10], row[11])
495                    elif ver == 1:
496                        msg = msg + (row[8], row[9], row[10], 0)
497                    else:
498                        msg = msg + (0, 0, 0, 0)
499                    batch.append(msg, len(val))
500                else:
501                    if self.cursor_db:
502                        self.cursor_db[0].close()
503                        self.cursor_db[1].close()
504                    self.cursor_db = None
505
506            return 0, batch
507
508        except Exception, e:
509            self.done = True
510            if self.cursor_db:
511                self.cursor_db[0].close()
512                self.cursor_db[1].close()
513            self.cursor_db = None
514
515            return "error: exception reading backup file: " + str(e), None
516
517    @staticmethod
518    def total_msgs(opts, source_bucket, source_node, source_map):
519        t = 0
520        file_list = glob.glob(BFD.db_dir(
521                                 source_map['spec'],
522                                 source_bucket['name'],
523                                 source_node['hostname']) + "/data-*.cbb")
524        if not file_list:
525            #check 3.0 directory structure
526            rv, file_list = BFDSource.list_files(opts,
527                                        source_map['spec'],
528                                        source_bucket['name'],
529                                        source_node['hostname'],
530                                        "data-*.cbb")
531            if rv != 0:
532                return rv, None
533
534        for x in sorted(file_list):
535            rv, db, ver = connect_db(x, opts, CBB_VERSION)
536            if rv != 0:
537                return rv, None
538
539            cur = db.cursor()
540            # TODO: (1) BFDSource - COUNT(*) is not indexed.
541            cur.execute("SELECT COUNT(*) FROM cbb_msg;")
542            t = t + cur.fetchone()[0]
543            cur.close()
544            db.close()
545
546        return 0, t
547
548    @staticmethod
549    def list_files(opts, spec, bucket, node, pattern):
550        file_list = []
551        prec_list = []
552        path, dirs = BFD.find_latest_dir(spec, None)
553        if not path:
554            return "error: No valid data in path:" % spec, None
555
556        path, dirs = BFD.find_latest_dir(path, None)
557        if not path:
558            return 0, file_list
559
560        for dir in dirs:
561            latest_dir = BFD.construct_dir(dir, bucket, node)
562            file_list.extend(glob.glob(os.path.join(latest_dir, pattern)))
563            for p in BFD.get_predecessors(latest_dir):
564                prec_list.append(os.path.dirname(p))
565            if len(prec_list) > 0:
566                break
567
568        while len(prec_list) > 0:
569            deps = glob.glob(os.path.join(prec_list[0], pattern))
570            for d in glob.glob(os.path.join(prec_list[0], pattern)):
571                if d not in file_list:
572                    file_list.append(d)
573            for p in BFD.get_predecessors(prec_list[0]):
574                dirname = os.path.dirname(p)
575                if dirname not in prec_list:
576                    prec_list.append(dirname)
577            prec_list = prec_list[1:]
578
579        return 0, file_list
580
581# --------------------------------------------------
582
583class BFDSink(BFD, pump.Sink):
584    """Can write to backup-file/directory layout."""
585
586    def __init__(self, opts, spec, source_bucket, source_node,
587                 source_map, sink_map, ctl, cur):
588        super(BFDSink, self).__init__(opts, spec, source_bucket, source_node,
589                                      source_map, sink_map, ctl, cur)
590        self.mode = "full"
591        self.init_worker(BFDSink.run)
592
593    @staticmethod
594    def run(self):
595        """Worker thread to asynchronously store incoming batches into db."""
596        s = "INSERT INTO cbb_msg (cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, \
597            dtype, meta_size, conf_res) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
598        db = None
599        cbb = 0       # Current cbb file NUM, like data-NUM.cbb.
600        cbb_bytes = 0 # Current cbb msg value bytes total.
601        db_dir = None
602        cbb_max_bytes = \
603            self.opts.extra.get("cbb_max_mb", 100000) * 1024 * 1024
604        _, dep, _, _ = BFD.find_seqno(self.opts,
605                                                  self.spec,
606                                                  self.bucket_name(),
607                                                  self.node_name(),
608                                                  self.mode)
609        seqno_map = {}
610        for i in range(BFD.NUM_VBUCKET):
611            seqno_map[i] = 0
612        while not self.ctl['stop']:
613            if not db:
614                rv, db, db_dir = self.create_db(cbb)
615                if rv != 0:
616                    return self.future_done(future, rv)
617
618                meta_file = os.path.join(db_dir, "meta.json")
619                json_file = open(meta_file, "w")
620                json.dump({'pred': dep}, json_file, ensure_ascii=False)
621                json_file.close()
622
623            batch, future = self.pull_next_batch()
624            if not batch:
625                if db:
626                    db.close()
627                return self.future_done(future, 0)
628
629            if db and cbb_bytes >= cbb_max_bytes:
630                db.close()
631                db = None
632                cbb += 1
633                cbb_bytes = 0
634                db_dir = None
635
636            if (self.bucket_name(), self.node_name()) in self.cur['failoverlog']:
637                BFD.write_json_file(db_dir,
638                                    "failover.json",
639                                    self.cur['failoverlog'][(self.bucket_name(), self.node_name())])
640            if (self.bucket_name(), self.node_name()) in self.cur['snapshot']:
641                BFD.write_json_file(db_dir,
642                                    "snapshot_markers.json",
643                                    self.cur['snapshot'][(self.bucket_name(), self.node_name())])
644            try:
645                c = db.cursor()
646
647                for msg in batch.msgs:
648                    cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta, conf_res = msg
649                    if self.skip(key, vbucket_id):
650                        continue
651                    if cmd not in [couchbaseConstants.CMD_TAP_MUTATION,
652                                   couchbaseConstants.CMD_TAP_DELETE,
653                                   couchbaseConstants.CMD_DCP_MUTATION,
654                                   couchbaseConstants.CMD_DCP_DELETE]:
655                        if db:
656                            db.close()
657                        return self.future_done(future,
658                                                "error: BFDSink bad cmd: " +
659                                                str(cmd))
660                    c.execute(s, (cmd, vbucket_id,
661                                  sqlite3.Binary(key),
662                                  flg, exp, str(cas),
663                                  sqlite3.Binary(str(meta)),
664                                  sqlite3.Binary(val),
665                                  seqno,
666                                  dtype,
667                                  nmeta,
668                                  conf_res))
669                    cbb_bytes += len(val)
670                    if seqno_map[vbucket_id] < seqno:
671                        seqno_map[vbucket_id] = seqno
672                db.commit()
673                BFD.write_json_file(db_dir, "seqno.json", seqno_map)
674                self.future_done(future, 0) # No return to keep looping.
675
676            except sqlite3.Error, e:
677                return self.future_done(future, "error: db error: " + str(e))
678            except Exception, e:
679                return self.future_done(future, "error: db exception: " + str(e))
680
681    @staticmethod
682    def can_handle(opts, spec):
683        spec = os.path.normpath(spec)
684        return (os.path.isdir(spec) or (not os.path.exists(spec) and
685                                        os.path.isdir(os.path.dirname(spec))))
686
687    @staticmethod
688    def check(opts, spec, source_map):
689        # TODO: (2) BFDSink - check disk space.
690        # TODO: (2) BFDSink - check should report on pre-creatable directories.
691
692        spec = os.path.normpath(spec)
693
694        # Check that directory's empty.
695        if os.path.exists(spec):
696            if not os.path.isdir(spec):
697                return "error: backup directory is not a directory: " + spec, None
698            if not os.access(spec, os.W_OK):
699                return "error: backup directory is not writable: " + spec, None
700            return 0, None
701
702        # Or, that the parent directory exists.
703        parent_dir = os.path.dirname(spec)
704
705        if not os.path.exists(parent_dir):
706            return "error: missing parent directory: " + parent_dir, None
707        if not os.path.isdir(parent_dir):
708            return "error: parent directory is not a directory: " + parent_dir, None
709        if not os.access(parent_dir, os.W_OK):
710            return "error: parent directory is not writable: " + parent_dir, None
711        return 0, None
712
713    @staticmethod
714    def consume_design(opts, sink_spec, sink_map,
715                       source_bucket, source_map, source_design):
716        if source_design:
717            fname = BFD.design_path(sink_spec,
718                                    source_bucket['name'])
719            try:
720                rv = pump.mkdirs(fname)
721                if rv:
722                    return rv, None
723                f = open(fname, 'w')
724                f.write(source_design)
725                f.close()
726            except IOError, e:
727                return ("error: could not write design: %s" +
728                        "; exception: %s") % (fname, e), None
729        return 0
730
731    @staticmethod
732    def consume_index(opts, sink_spec, sink_map,
733                       source_bucket, source_map, source_design):
734        if source_design:
735            fname = BFD.index_path(sink_spec,
736                                    source_bucket['name'])
737            try:
738                rv = pump.mkdirs(fname)
739                if rv:
740                    return rv, None
741                f = open(fname, 'w')
742                f.write(source_design)
743                f.close()
744            except IOError, e:
745                return ("error: could not write index: %s" +
746                        "; exception: %s") % (fname, e), None
747        return 0
748
749    def consume_batch_async(self, batch):
750        return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
751
752    def create_db(self, num):
753        rv, dir = self.mkdirs()
754        if rv != 0:
755            return rv, None, None
756
757        path = dir + "/data-%s.cbb" % (string.rjust(str(num), 4, '0'))
758        rv, db = create_db(path, self.opts)
759        if rv != 0:
760            return rv, None, None
761
762        return 0, db, dir
763
764    def mkdirs(self):
765        """Make directories, if not already, with structure like...
766           <spec>/
767             YYYY-MM-DDThhmmssZ/
768                YYYY-MM-DDThhmmssZ-full /
769                   bucket-<BUCKETNAME>/
770                     design.json
771                     node-<NODE>/
772                       data-<XXXX>.cbb
773                YYYY-MM-DDThhmmssZ-diff/
774                   bucket-<BUCKETNAME>/
775                     design.json
776                     node-<NODE>/
777                       data-<XXXX>.cbb
778                   """
779        """CBSE-1052: There appears to be a race condition in os.mkdir. Suppose
780           more than two threads simultaneously try to create the same directory
781           or different directories with a common non-existent ancestor. Both check
782           the directory doesn't exists, then both invoke os.mkdir. One of these
783           will throw OSError due to underlying EEXIST system error."""
784
785        spec = os.path.normpath(self.spec)
786        if not os.path.isdir(spec):
787            try:
788                os.mkdir(spec)
789            except OSError as error:
790                if error.errno != errno.EEXIST:
791                    return "error: could not mkdir: %s; exception: %s" % (spec, error.strerror)
792
793        new_session = self.ctl['new_session']
794        self.ctl['new_session'] = False
795        d = BFD.db_dir(self.spec,
796                       self.bucket_name(),
797                       self.node_name(),
798                       self.ctl['new_timestamp'],
799                       getattr(self.opts, "mode", "diff"),
800                       new_session)
801        if not os.path.isdir(d):
802            try:
803                os.makedirs(d)
804            except OSError, e:
805                if not os.path.isdir(d):
806                    return "error: could not mkdirs: %s; exception: %s" % (d, e), None
807        return 0, d
808
809
810# --------------------------------------------------
811
812def create_db(db_path, opts):
813    try:
814        logging.debug("  create_db: " + db_path)
815
816        rv, db, ver = connect_db(db_path, opts, [0])
817        if rv != 0:
818            logging.debug("fail to call connect_db:" + db_path)
819            return rv, None
820
821        # The cas column is type text, not integer, because sqlite
822        # integer is 63-bits instead of 64-bits.
823        db.executescript("""
824                  BEGIN;
825                  CREATE TABLE cbb_msg
826                     (cmd integer,
827                      vbucket_id integer,
828                      key blob,
829                      flg integer,
830                      exp integer,
831                      cas text,
832                      meta blob,
833                      val blob,
834                      seqno integer,
835                      dtype integer,
836                      meta_size integer,
837                      conf_res integer);
838                  pragma user_version=%s;
839                  COMMIT;
840                """ % (CBB_VERSION[2]))
841
842        return 0, db
843
844    except Exception, e:
845        return "error: create_db exception: " + str(e), None
846
847def connect_db(db_path, opts, version):
848    try:
849        # TODO: (1) BFD - connect_db - use pragma page_size.
850        # TODO: (1) BFD - connect_db - use pragma max_page_count.
851        # TODO: (1) BFD - connect_db - use pragma journal_mode.
852
853        logging.debug("  connect_db: " + db_path)
854
855        db = sqlite3.connect(db_path)
856        db.text_factory = str
857
858        cur = db.execute("pragma user_version").fetchall()[0][0]
859        if cur not in version:
860            logging.debug("dbpath is not empty: " + db_path)
861            return "error: unexpected db user version: " + \
862                str(cur) + " vs " + str(version) + \
863                ", maybe a backup directory created by older releases is reused", \
864                None, None
865
866        return 0, db, version.index(cur)
867
868    except Exception, e:
869        return "error: connect_db exception: " + str(e), None
870
871def cleanse(d):
872    """Elide passwords from hierarchy of dict/list's."""
873    return cleanse_helper(copy.deepcopy(d))
874
875def cleanse_helper(d):
876    """Recursively, destructively elide passwords from hierarchy of dict/list's."""
877    if type(d) is list:
878        for x in d:
879            cleanse_helper(x)
880    elif type(d) is dict:
881        for k, v in d.iteritems():
882            if "assword" in k:
883                d[k] = '<...ELIDED...>'
884            else:
885                d[k] = cleanse_helper(v)
886    return d
887
888def recursive_glob(rootdir='.', pattern='*'):
889    return [os.path.join(rootdir, filename)
890            for rootdir, dirnames, filenames in os.walk(rootdir)
891            for filename in filenames
892            if fnmatch.fnmatch(filename, pattern)]
893
894def local_to_utc(t):
895    secs = time.mktime(t.timetuple())
896    return datetime.datetime.utcfromtimestamp(secs)
897