xref: /6.0.3/couchbase-cli/pump_bfd.py (revision 978253c1)
1#!/usr/bin/env python
2
3import copy
4import errno
5import glob
6import logging
7import os
8import json
9import string
10import sys
11import datetime
12import time
13import urllib
14import fnmatch
15import sqlite3
16
17import couchbaseConstants
18import pump
19
20CBB_VERSION = [2004, 2014, 2015] # sqlite pragma user version.
21
22DDOC_FILE_NAME = "design.json"
23INDEX_FILE_NAME = "index.json"
24FTS_FILE_NAME = "fts_index.json"
25
26class BFD:
27    """Mixin for backup-file/directory EndPoint helper methods."""
28    NUM_VBUCKET = 1024
29
30    def bucket_name(self):
31        return self.source_bucket['name']
32
33    def node_name(self):
34        return self.source_node['hostname']
35
36    @staticmethod
37    def get_file_path(spec, bucket_name, file):
38        bucket_name = urllib.quote_plus(bucket_name).encode('ascii')
39        bucket_path = os.path.join(os.path.normpath(spec), "bucket-" + bucket_name)
40        if os.path.isdir(bucket_path):
41            return os.path.join(bucket_path, file)
42        else:
43            path, dirs = BFD.find_latest_dir(spec, None)
44            if path:
45                path, dirs = BFD.find_latest_dir(path, None)
46                if path:
47                    return os.path.join(path, "bucket-" + bucket_name, file)
48        return os.path.join(bucket_path, file)
49
50    @staticmethod
51    def construct_dir(parent, bucket_name, node_name):
52        return os.path.join(parent,
53                    "bucket-" + urllib.quote_plus(bucket_name).encode('ascii'),
54                    "node-" + urllib.quote_plus(node_name).encode('ascii'))
55
56    @staticmethod
57    def check_full_dbfiles(parent_dir):
58        return glob.glob(os.path.join(parent_dir, "data-*.cbb"))
59
60    @staticmethod
61    def get_predecessors(parent_dir):
62        try:
63            json_file = open(os.path.join(parent_dir, "meta.json"), "r")
64            json_data = json.load(json_file)
65            json_file.close()
66            return json_data["pred"]
67        except IOError:
68            return []
69
70    @staticmethod
71    def get_server_version(parent_dir):
72        try:
73            json_file = open(os.path.join(parent_dir, "meta.json"), "r")
74            json_data = json.load(json_file)
75            json_file.close()
76            if "version" in json_data:
77                return json_data["version"]
78            return "0.0.0"
79        except IOError:
80            return "0.0.0"
81
82    @staticmethod
83    def get_failover_log(parent_dir):
84        try:
85            filepath = os.path.join(parent_dir, "failover.json")
86            if os.path.isfile(filepath):
87                json_file = open(filepath, "r")
88                json_data = json.load(json_file)
89                json_file.close()
90                return json_data
91            else:
92                return {}
93        except IOError:
94            return {}
95
96    @staticmethod
97    def list_files(opts, spec, bucket, node, pattern):
98        file_list = []
99        prec_list = []
100
101    @staticmethod
102    def write_json_file(parent_dir, filename, output_data):
103        filepath = os.path.join(parent_dir, filename)
104        json_data = {}
105        if os.path.isfile(filepath):
106            #load into existed meta data generated from previous batch run
107            try:
108                json_file = open(filepath, "r")
109                json_data = json.load(json_file)
110                json_file.close()
111            except IOError:
112                pass
113
114        for i in range(BFD.NUM_VBUCKET):
115            if output_data.get(i):
116                str_index = str(i)
117                historic_data_exist = json_data.get(str_index)
118                if historic_data_exist:
119                    #Historic data will share same data type as incoming ones.
120                    #It will be sufficient to only check type for historic data
121                    if isinstance(json_data[str_index], int):
122                        #For seqno, we want to keep the highest seqno
123                        if json_data[str_index] < output_data[i]:
124                            json_data[str_index] = output_data[i]
125                    elif isinstance(json_data[str_index], list):
126                        #For each vbucket, we want to get the superset of its references
127                        if len(json_data[str_index]) <= len(output_data[i]):
128                            json_data[str_index] = output_data[i]
129                else:
130                    #Bookkeeping the incoming one.
131                    json_data[str_index] = output_data[i]
132
133        json_file = open(filepath, "w")
134        json.dump(json_data, json_file, ensure_ascii=False)
135        json_file.close()
136
137    @staticmethod
138    def db_dir(spec, bucket_name, node_name, tmstamp=None, mode=None, new_session=False):
139        parent_dir = os.path.normpath(spec) + \
140                        '/bucket-' + urllib.quote_plus(bucket_name).encode('ascii') + \
141                        '/node-' + urllib.quote_plus(node_name).encode('ascii')
142        if os.path.isdir(parent_dir):
143            return parent_dir
144
145        #check 3.0 directory structure
146        if not tmstamp:
147            tmstamp = time.strftime("%Y-%m-%dT%H%M%SZ", time.gmtime())
148        parent_dir = os.path.normpath(spec)
149        rootpath, dirs = BFD.find_latest_dir(parent_dir, None)
150        if not rootpath or not mode or mode == "full":
151            # no any backup roots exists
152            path = os.path.join(parent_dir, tmstamp, tmstamp+"-full")
153            return BFD.construct_dir(path, bucket_name, node_name)
154
155        #check if any full backup exists
156        path, dirs = BFD.find_latest_dir(rootpath, "full")
157        if not path:
158            path = os.path.join(rootpath, tmstamp+"-full")
159            return BFD.construct_dir(path, bucket_name, node_name)
160        else:
161            #further check full backup for this bucket and node
162            path = BFD.construct_dir(path, bucket_name, node_name)
163            if not os.path.isdir(path):
164                return path
165
166        if mode.find("diff") >= 0:
167            path, dirs = BFD.find_latest_dir(rootpath, "diff")
168            if not path or new_session:
169                path = os.path.join(rootpath, tmstamp+"-diff")
170                return BFD.construct_dir(path, bucket_name, node_name)
171            else:
172                path = BFD.construct_dir(path, bucket_name, node_name)
173                if not os.path.isdir(path):
174                    return path
175                else:
176                    path = os.path.join(rootpath, tmstamp+"-diff")
177                    return BFD.construct_dir(path, bucket_name, node_name)
178        elif mode.find("accu") >= 0:
179            path, dirs = BFD.find_latest_dir(rootpath, "accu")
180            if not path or new_session:
181                path = os.path.join(rootpath, tmstamp+"-accu")
182                return BFD.construct_dir(path, bucket_name, node_name)
183            else:
184                path = BFD.construct_dir(path, bucket_name, node_name)
185                if not os.path.isdir(path):
186                    return path
187                else:
188                    path = os.path.join(rootpath, tmstamp+"-accu")
189                    return BFD.construct_dir(path, bucket_name, node_name)
190        else:
191            return parent_dir
192
193    @staticmethod
194    def find_latest_dir(parent_dir, mode):
195        all_subdirs = []
196        latest_dir = None
197        if not os.path.isdir(parent_dir):
198            return latest_dir, all_subdirs
199        for d in os.listdir(parent_dir):
200            if not mode or d.find(mode) >= 0:
201                bd = os.path.join(parent_dir, d)
202                if os.path.isdir(bd):
203                    all_subdirs.append(bd)
204        if all_subdirs:
205            all_subdirs = sorted(all_subdirs,key=os.path.getmtime, reverse=True)
206            latest_dir = all_subdirs[0]
207        return latest_dir, all_subdirs
208
209    @staticmethod
210    def find_seqno(opts, spec, bucket_name, node_name, mode):
211        seqno = {}
212        dep = {}
213        dep_list = []
214        failover_log = {}
215        snapshot_markers = {}
216        for i in range(BFD.NUM_VBUCKET):
217            seqno[str(i)] = 0
218            dep[i] = None
219            failover_log[i] = None
220            snapshot_markers[i] = None
221
222        file_list = []
223        failoverlog_list = []
224        snapshot_list = []
225        seqno_list = []
226        parent_dir = os.path.normpath(spec)
227
228        if mode == "full":
229            return seqno, dep_list, failover_log, snapshot_markers
230        timedir,latest_dirs = BFD.find_latest_dir(parent_dir, None)
231        if not timedir:
232            return seqno, dep_list, failover_log, snapshot_markers
233        fulldir, latest_dirs = BFD.find_latest_dir(timedir, "full")
234        if not fulldir:
235            return seqno, dep_list, failover_log, snapshot_markers
236
237        path = BFD.construct_dir(fulldir, bucket_name, node_name)
238        if not os.path.isdir(path):
239            return seqno, dep_list, failover_log, snapshot_markers
240
241        last_backup = BFD.construct_dir(fulldir, bucket_name, node_name)
242        file_list.extend(recursive_glob(path, 'data-*.cbb'))
243        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
244        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
245        seqno_list.extend(recursive_glob(path, 'seqno.json'))
246
247        if mode.find("accu") < 0:
248            accudir, accu_dirs = BFD.find_latest_dir(timedir, "accu")
249            if accudir:
250                path = BFD.construct_dir(accudir, bucket_name, node_name)
251                last_backup = path
252                if os.path.isdir(path):
253                    file_list.extend(recursive_glob(path, 'data-*.cbb'))
254                    failoverlog_list.extend(recursive_glob(path, 'failover.json'))
255                    snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
256                    seqno_list.extend(recursive_glob(path, 'seqno.json'))
257        if mode.find("diff") >= 0:
258            diffdir, diff_dirs = BFD.find_latest_dir(timedir, "diff")
259            if diff_dirs:
260                last_backup = BFD.construct_dir(diffdir, bucket_name, node_name)
261                for dir in diff_dirs:
262                    path = BFD.construct_dir(dir, bucket_name, node_name)
263                    if os.path.isdir(path):
264                        file_list.extend(recursive_glob(path, 'data-*.cbb'))
265                        failoverlog_list.extend(recursive_glob(path, 'failover.json'))
266                        snapshot_list.extend(recursive_glob(path, 'snapshot_markers.json'))
267                        seqno_list.extend(recursive_glob(path, 'seqno.json'))
268
269        if BFD.get_server_version(last_backup) < "5.0.0":
270            return dict(), list(), dict(), dict()
271
272        for x in sorted(seqno_list):
273            try:
274                json_file = open(x, "r")
275                json_data = json.load(json_file)
276                json_file.close()
277
278                for vbid, seq in json_data.iteritems():
279                    if not seq:
280                        continue
281                    if seqno.get(vbid) < seq:
282                        seqno[vbid] = seq
283            except IOError:
284                pass
285
286        for log_file in sorted(failoverlog_list):
287            try:
288                json_file = open(log_file, "r")
289                json_data = json.load(json_file)
290                json_file.close()
291
292                for vbid, flogs in json_data.iteritems():
293                    if not flogs:
294                        continue
295                    elif vbid not in failover_log.keys():
296                        failover_log[vbid] = flogs
297                    else:
298                        for logpair in flogs:
299                            if not failover_log[vbid]:
300                                failover_log[vbid] = [logpair]
301                            elif logpair not in failover_log[vbid]:
302                                failover_log[vbid].append(logpair)
303            except IOError:
304                pass
305
306        for snapshot in sorted(snapshot_list):
307            try:
308                json_file = open(snapshot, "r")
309                json_data = json.load(json_file)
310                json_file.close()
311
312                for vbid, markers in json_data.iteritems():
313                    snapshot_markers[vbid] = markers
314            except IOError:
315                pass
316
317        for i in range(BFD.NUM_VBUCKET):
318            if dep[i] and dep[i] not in dep_list:
319                dep_list.append(dep[i])
320        return seqno, dep_list, failover_log, snapshot_markers
321
322# --------------------------------------------------
323
324class BFDSource(BFD, pump.Source):
325    """Can read from backup-file/directory layout."""
326
327    def __init__(self, opts, spec, source_bucket, source_node,
328                 source_map, sink_map, ctl, cur):
329        super(BFDSource, self).__init__(opts, spec, source_bucket, source_node,
330                                        source_map, sink_map, ctl, cur)
331        self.done = False
332        self.files = None
333        self.cursor_db = None
334
335    @staticmethod
336    def can_handle(opts, spec):
337        if os.path.isdir(spec):
338            dirs = glob.glob(spec + "/bucket-*/node-*/data-*.cbb")
339            if dirs:
340                return True
341            path, dirs = BFD.find_latest_dir(spec, None)
342            if path:
343                path, dirs = BFD.find_latest_dir(path, "full")
344                if path:
345                    return glob.glob(path + "*/bucket-*/node-*/data-*.cbb")
346        return False
347
348    @staticmethod
349    def check(opts, spec):
350        spec = os.path.normpath(spec)
351        if not os.path.isdir(spec):
352            return "error: backup_dir is not a directory: " + spec, None
353
354        buckets = []
355
356        bucket_dirs = glob.glob(spec + "/bucket-*")
357        if not bucket_dirs:
358            #check 3.0 directory structure
359            path, dirs = BFD.find_latest_dir(spec, None)
360            if not path:
361                return "error: no backup directory found: " + spec, None
362            latest_dir, dir = BFD.find_latest_dir(path, "full")
363            if not latest_dir:
364                return "error: no valid backup directory found: " + path, None
365            bucket_dirs = glob.glob(latest_dir + "/bucket-*")
366
367        for bucket_dir in sorted(bucket_dirs):
368            if not os.path.isdir(bucket_dir):
369                return "error: not a bucket directory: " + bucket_dir, None
370
371            bucket_name = os.path.basename(bucket_dir)[len("bucket-"):].strip()
372            bucket_name = urllib.unquote_plus(bucket_name).encode('ascii')
373            if not bucket_name:
374                return "error: bucket_name too short: " + bucket_dir, None
375
376            bucket = { 'name': bucket_name, 'nodes': [] }
377            buckets.append(bucket)
378
379            node_dirs = glob.glob(bucket_dir + "/node-*")
380            for node_dir in sorted(node_dirs):
381                if not os.path.isdir(node_dir):
382                    return "error: not a node directory: " + node_dir, None
383
384                node_name = os.path.basename(node_dir)[len("node-"):].strip()
385                node_name = urllib.unquote_plus(node_name).encode('ascii')
386                if not node_name:
387                    return "error: node_name too short: " + node_dir, None
388
389                bucket['nodes'].append({ 'hostname': node_name })
390
391        return 0, { 'spec': spec,
392                    'buckets': buckets }
393
394    @staticmethod
395    def provide_design(opts, source_spec, source_bucket, source_map):
396        return BFDSource.read_index_file(source_spec, source_bucket, DDOC_FILE_NAME)
397
398    @staticmethod
399    def provide_index(opts, source_spec, source_bucket, source_map):
400        return BFDSource.read_index_file(source_spec, source_bucket, INDEX_FILE_NAME)
401
402    @staticmethod
403    def provide_fts_index(opts, source_spec, source_bucket, source_map):
404        return BFDSource.read_index_file(source_spec, source_bucket, FTS_FILE_NAME)
405
406    @staticmethod
407    def read_index_file(source_spec, source_bucket, file_name):
408        path = BFD.get_file_path(source_spec, source_bucket['name'], file_name)
409        if os.path.exists(path):
410            try:
411                f = open(path, 'r')
412                d = f.read()
413                f.close()
414                return 0, d
415            except IOError, e:
416                return ("error: could not read %s; exception: %s") % (path, e), None
417        return 0, None
418
419    def get_conflict_resolution_type(self):
420        dir = BFD.construct_dir(self.source_map['spec'],
421                                self.source_bucket['name'],
422                                self.source_node['hostname'])
423        metafile = os.path.join(dir, "meta.json")
424        try:
425            json_file = open(metafile, "r")
426            json_data = json.load(json_file)
427            json_file.close()
428            if "conflict_resolution_type" in json_data:
429                return json_data["conflict_resolution_type"]
430            return "seqno"
431        except IOError:
432            return "seqno"
433
434    def provide_batch(self):
435        if self.done:
436            return 0, None
437
438        batch = pump.Batch(self)
439
440        batch_max_size = self.opts.extra['batch_max_size']
441        batch_max_bytes = self.opts.extra['batch_max_bytes']
442
443        s = ["SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val FROM cbb_msg",
444             "SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, meta_size FROM cbb_msg",
445             "SELECT cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, meta_size, conf_res FROM cbb_msg"]
446
447        if self.files is None: # None != [], as self.files will shrink to [].
448            g =  glob.glob(BFD.db_dir(self.spec, self.bucket_name(), self.node_name()) + "/data-*.cbb")
449            if not g:
450                #check 3.0 file structure
451                rv, file_list = BFDSource.list_files(self.opts,
452                                                     self.spec,
453                                                     self.bucket_name(),
454                                                     self.node_name(),
455                                                     "data-*.cbb")
456                if rv != 0:
457                    return rv, None
458                from_date = getattr(self.opts, "from_date", None)
459                if from_date:
460                    from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d")
461
462                to_date = getattr(self.opts, "to_date", None)
463                if to_date:
464                    to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d")
465                g = []
466                for f in file_list:
467                    mtime = datetime.datetime.fromtimestamp(os.path.getmtime(f))
468                    if (not from_date or mtime >= from_date) and (not to_date or mtime <= to_date):
469                        g.append(f)
470            self.files = sorted(g)
471        try:
472            ver = 0
473            while (not self.done and
474                   batch.size() < batch_max_size and
475                   batch.bytes < batch_max_bytes):
476                if self.cursor_db is None:
477                    if not self.files:
478                        self.done = True
479                        return 0, batch
480
481                    rv, db, ver = connect_db(self.files[0], self.opts, CBB_VERSION)
482                    if rv != 0:
483                        return rv, None
484                    self.files = self.files[1:]
485
486                    cursor = db.cursor()
487                    cursor.execute(s[ver])
488
489                    self.cursor_db = (cursor, db, ver)
490
491                cursor, db, ver = self.cursor_db
492
493                row = cursor.fetchone()
494                if row:
495                    vbucket_id = row[1]
496                    key = row[2]
497                    val = row[7]
498
499                    if self.skip(key, vbucket_id):
500                        continue
501                    msg = (row[0], row[1], row[2], row[3], row[4],
502                           int(row[5]), # CAS as 64-bit integer not string.
503                           row[6], # revid as 64-bit integer too
504                           row[7])
505                    if ver == 2:
506                        msg = msg + (row[8], row[9], row[10], row[11])
507                    elif ver == 1:
508                        msg = msg + (row[8], row[9], row[10], 0)
509                    else:
510                        msg = msg + (0, 0, 0, 0)
511                    batch.append(msg, len(val))
512                else:
513                    if self.cursor_db:
514                        self.cursor_db[0].close()
515                        self.cursor_db[1].close()
516                    self.cursor_db = None
517
518            return 0, batch
519
520        except Exception, e:
521            self.done = True
522            if self.cursor_db:
523                self.cursor_db[0].close()
524                self.cursor_db[1].close()
525            self.cursor_db = None
526
527            return "error: exception reading backup file: " + str(e), None
528
529    @staticmethod
530    def total_msgs(opts, source_bucket, source_node, source_map):
531        t = 0
532        file_list = glob.glob(BFD.db_dir(
533                                 source_map['spec'],
534                                 source_bucket['name'],
535                                 source_node['hostname']) + "/data-*.cbb")
536        if not file_list:
537            #check 3.0 directory structure
538            rv, file_list = BFDSource.list_files(opts,
539                                        source_map['spec'],
540                                        source_bucket['name'],
541                                        source_node['hostname'],
542                                        "data-*.cbb")
543            if rv != 0:
544                return rv, None
545
546        for x in sorted(file_list):
547            rv, db, ver = connect_db(x, opts, CBB_VERSION)
548            if rv != 0:
549                return rv, None
550
551            cur = db.cursor()
552            # TODO: (1) BFDSource - COUNT(*) is not indexed.
553            cur.execute("SELECT COUNT(*) FROM cbb_msg;")
554            t = t + cur.fetchone()[0]
555            cur.close()
556            db.close()
557
558        return 0, t
559
560    @staticmethod
561    def list_files(opts, spec, bucket, node, pattern):
562        file_list = []
563        prec_list = []
564        path, dirs = BFD.find_latest_dir(spec, None)
565        if not path:
566            return "error: No valid data in path:" % spec, None
567
568        path, dirs = BFD.find_latest_dir(path, None)
569        if not path:
570            return 0, file_list
571
572        for dir in dirs:
573            latest_dir = BFD.construct_dir(dir, bucket, node)
574            file_list.extend(glob.glob(os.path.join(latest_dir, pattern)))
575            for p in BFD.get_predecessors(latest_dir):
576                prec_list.append(os.path.dirname(p))
577            if len(prec_list) > 0:
578                break
579
580        while len(prec_list) > 0:
581            deps = glob.glob(os.path.join(prec_list[0], pattern))
582            for d in glob.glob(os.path.join(prec_list[0], pattern)):
583                if d not in file_list:
584                    file_list.append(d)
585            for p in BFD.get_predecessors(prec_list[0]):
586                dirname = os.path.dirname(p)
587                if dirname not in prec_list:
588                    prec_list.append(dirname)
589            prec_list = prec_list[1:]
590
591        return 0, file_list
592
593# --------------------------------------------------
594
595class BFDSink(BFD, pump.Sink):
596    """Can write to backup-file/directory layout."""
597
598    def __init__(self, opts, spec, source_bucket, source_node,
599                 source_map, sink_map, ctl, cur):
600        super(BFDSink, self).__init__(opts, spec, source_bucket, source_node,
601                                      source_map, sink_map, ctl, cur)
602        self.mode = "full"
603        self.init_worker(BFDSink.run)
604
605    @staticmethod
606    def run(self):
607        """Worker thread to asynchronously store incoming batches into db."""
608        s = "INSERT INTO cbb_msg (cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, \
609            dtype, meta_size, conf_res) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"
610        db = None
611        cbb = 0       # Current cbb file NUM, like data-NUM.cbb.
612        cbb_bytes = 0 # Current cbb msg value bytes total.
613        db_dir = None
614        cbb_max_bytes = \
615            self.opts.extra.get("cbb_max_mb", 100000) * 1024 * 1024
616        _, dep, _, _ = BFD.find_seqno(self.opts,
617                                                  self.spec,
618                                                  self.bucket_name(),
619                                                  self.node_name(),
620                                                  self.mode)
621
622        confResType = "seqno"
623        if "conflictResolutionType" in self.source_bucket:
624            confResType = self.source_bucket["conflictResolutionType"]
625
626        version = "0.0.0"
627        for bucket in self.source_map["buckets"]:
628            if self.bucket_name() == bucket["name"]:
629                for node in bucket["nodes"]:
630                    if node["hostname"] == self.node_name():
631                        version = node["version"].split("-")[0]
632
633        seqno_map = {}
634        for i in range(BFD.NUM_VBUCKET):
635            seqno_map[i] = 0
636        while not self.ctl['stop']:
637            if db and cbb_bytes >= cbb_max_bytes:
638                db.close()
639                db = None
640                cbb += 1
641                cbb_bytes = 0
642                db_dir = None
643
644            if not db:
645                rv, db, db_dir = self.create_db(cbb)
646                if rv != 0:
647                    return self.future_done(future, rv)
648
649                meta_file = os.path.join(db_dir, "meta.json")
650                json_file = open(meta_file, "w")
651                toWrite = {'pred': dep,
652                           'conflict_resolution_type': confResType,
653                           'version': version}
654                json.dump(toWrite, json_file, ensure_ascii=False)
655                json_file.close()
656
657            batch, future = self.pull_next_batch()
658            if not batch:
659                if db:
660                    db.close()
661                return self.future_done(future, 0)
662
663            if (self.bucket_name(), self.node_name()) in self.cur['failoverlog']:
664                BFD.write_json_file(db_dir,
665                                    "failover.json",
666                                    self.cur['failoverlog'][(self.bucket_name(), self.node_name())])
667            if (self.bucket_name(), self.node_name()) in self.cur['snapshot']:
668                BFD.write_json_file(db_dir,
669                                    "snapshot_markers.json",
670                                    self.cur['snapshot'][(self.bucket_name(), self.node_name())])
671            try:
672                c = db.cursor()
673
674                for msg in batch.msgs:
675                    cmd, vbucket_id, key, flg, exp, cas, meta, val, seqno, dtype, nmeta, conf_res = msg
676                    if self.skip(key, vbucket_id):
677                        continue
678                    if cmd not in [couchbaseConstants.CMD_TAP_MUTATION,
679                                   couchbaseConstants.CMD_TAP_DELETE,
680                                   couchbaseConstants.CMD_DCP_MUTATION,
681                                   couchbaseConstants.CMD_DCP_DELETE]:
682                        if db:
683                            db.close()
684                        return self.future_done(future,
685                                                "error: BFDSink bad cmd: " +
686                                                str(cmd))
687                    c.execute(s, (cmd, vbucket_id,
688                                  sqlite3.Binary(key),
689                                  flg, exp, str(cas),
690                                  sqlite3.Binary(str(meta)),
691                                  sqlite3.Binary(val),
692                                  seqno,
693                                  dtype,
694                                  nmeta,
695                                  conf_res))
696                    cbb_bytes += len(val)
697                    if seqno_map[vbucket_id] < seqno:
698                        seqno_map[vbucket_id] = seqno
699                db.commit()
700                BFD.write_json_file(db_dir, "seqno.json", seqno_map)
701                self.future_done(future, 0) # No return to keep looping.
702
703            except sqlite3.Error, e:
704                return self.future_done(future, "error: db error: " + str(e))
705            except Exception, e:
706                return self.future_done(future, "error: db exception: " + str(e))
707
708    @staticmethod
709    def can_handle(opts, spec):
710        spec = os.path.normpath(spec)
711        return (os.path.isdir(spec) or (not os.path.exists(spec) and
712                                        os.path.isdir(os.path.dirname(spec))))
713
714    @staticmethod
715    def check(opts, spec, source_map):
716        # TODO: (2) BFDSink - check disk space.
717        # TODO: (2) BFDSink - check should report on pre-creatable directories.
718
719        spec = os.path.normpath(spec)
720
721        # Check that directory's empty.
722        if os.path.exists(spec):
723            if not os.path.isdir(spec):
724                return "error: backup directory is not a directory: " + spec, None
725            if not os.access(spec, os.W_OK):
726                return "error: backup directory is not writable: " + spec, None
727            return 0, None
728
729        # Or, that the parent directory exists.
730        parent_dir = os.path.dirname(spec)
731
732        if not os.path.exists(parent_dir):
733            return "error: missing parent directory: " + parent_dir, None
734        if not os.path.isdir(parent_dir):
735            return "error: parent directory is not a directory: " + parent_dir, None
736        if not os.access(parent_dir, os.W_OK):
737            return "error: parent directory is not writable: " + parent_dir, None
738        return 0, None
739
740    @staticmethod
741    def consume_design(opts, sink_spec, sink_map, source_bucket, source_map, index_defs):
742        return BFDSink.write_index_file(sink_spec, source_bucket, index_defs, DDOC_FILE_NAME)
743
744    @staticmethod
745    def consume_index(opts, sink_spec, sink_map, source_bucket, source_map, index_defs):
746        return BFDSink.write_index_file(sink_spec, source_bucket, index_defs, INDEX_FILE_NAME)
747
748    @staticmethod
749    def consume_fts_index(opts, sink_spec, sink_map, source_bucket, source_map, index_defs):
750        return BFDSink.write_index_file(sink_spec, source_bucket, index_defs, FTS_FILE_NAME)
751
752    @staticmethod
753    def write_index_file(sink_spec, source_bucket, index_defs, file_name):
754        if index_defs:
755            path = BFD.get_file_path(sink_spec, source_bucket['name'], file_name)
756            try:
757                rv = pump.mkdirs(path)
758                if rv:
759                    return rv, None
760                f = open(path, 'w')
761                f.write(index_defs)
762                f.close()
763            except IOError, e:
764                return ("error: could not write %s; exception: %s") % (path, e), None
765        return 0
766
767    def consume_batch_async(self, batch):
768        return self.push_next_batch(batch, pump.SinkBatchFuture(self, batch))
769
770    def create_db(self, num):
771        rv, dir = self.mkdirs()
772        if rv != 0:
773            return rv, None, None
774
775        path = dir + "/data-%s.cbb" % (string.rjust(str(num), 4, '0'))
776        rv, db = create_db(path, self.opts)
777        if rv != 0:
778            return rv, None, None
779
780        return 0, db, dir
781
782    def mkdirs(self):
783        """Make directories, if not already, with structure like...
784           <spec>/
785             YYYY-MM-DDThhmmssZ/
786                YYYY-MM-DDThhmmssZ-full /
787                   bucket-<BUCKETNAME>/
788                     design.json
789                     node-<NODE>/
790                       data-<XXXX>.cbb
791                YYYY-MM-DDThhmmssZ-diff/
792                   bucket-<BUCKETNAME>/
793                     design.json
794                     node-<NODE>/
795                       data-<XXXX>.cbb
796                   """
797        """CBSE-1052: There appears to be a race condition in os.mkdir. Suppose
798           more than two threads simultaneously try to create the same directory
799           or different directories with a common non-existent ancestor. Both check
800           the directory doesn't exists, then both invoke os.mkdir. One of these
801           will throw OSError due to underlying EEXIST system error."""
802
803        spec = os.path.normpath(self.spec)
804        if not os.path.isdir(spec):
805            try:
806                os.mkdir(spec)
807            except OSError as error:
808                if error.errno != errno.EEXIST:
809                    return "error: could not mkdir: %s; exception: %s" % (spec, error.strerror)
810
811        new_session = self.ctl['new_session']
812        self.ctl['new_session'] = False
813        d = BFD.db_dir(self.spec,
814                       self.bucket_name(),
815                       self.node_name(),
816                       self.ctl['new_timestamp'],
817                       getattr(self.opts, "mode", "diff"),
818                       new_session)
819        if not os.path.isdir(d):
820            try:
821                os.makedirs(d)
822            except OSError, e:
823                if not os.path.isdir(d):
824                    return "error: could not mkdirs: %s; exception: %s" % (d, e), None
825        return 0, d
826
827
828# --------------------------------------------------
829
830def create_db(db_path, opts):
831    try:
832        logging.debug("  create_db: " + db_path)
833
834        rv, db, ver = connect_db(db_path, opts, [0])
835        if rv != 0:
836            logging.debug("fail to call connect_db:" + db_path)
837            return rv, None
838
839        # The cas column is type text, not integer, because sqlite
840        # integer is 63-bits instead of 64-bits.
841        db.executescript("""
842                  BEGIN;
843                  CREATE TABLE cbb_msg
844                     (cmd integer,
845                      vbucket_id integer,
846                      key blob,
847                      flg integer,
848                      exp integer,
849                      cas text,
850                      meta blob,
851                      val blob,
852                      seqno integer,
853                      dtype integer,
854                      meta_size integer,
855                      conf_res integer);
856                  pragma user_version=%s;
857                  COMMIT;
858                """ % (CBB_VERSION[2]))
859
860        return 0, db
861
862    except Exception, e:
863        return "error: create_db exception: " + str(e), None
864
865def connect_db(db_path, opts, version):
866    try:
867        # TODO: (1) BFD - connect_db - use pragma page_size.
868        # TODO: (1) BFD - connect_db - use pragma max_page_count.
869        # TODO: (1) BFD - connect_db - use pragma journal_mode.
870
871        logging.debug("  connect_db: " + db_path)
872
873        db = sqlite3.connect(db_path)
874        db.text_factory = str
875
876        cur = db.execute("pragma user_version").fetchall()[0][0]
877        if cur not in version:
878            logging.debug("dbpath is not empty: " + db_path)
879            return "error: unexpected db user version: " + \
880                str(cur) + " vs " + str(version) + \
881                ", maybe a backup directory created by older releases is reused", \
882                None, None
883
884        return 0, db, version.index(cur)
885
886    except Exception, e:
887        return "error: connect_db exception: " + str(e), None
888
889def cleanse(d):
890    """Elide passwords from hierarchy of dict/list's."""
891    return cleanse_helper(copy.deepcopy(d))
892
893def cleanse_helper(d):
894    """Recursively, destructively elide passwords from hierarchy of dict/list's."""
895    if type(d) is list:
896        for x in d:
897            cleanse_helper(x)
898    elif type(d) is dict:
899        for k, v in d.iteritems():
900            if "assword" in k:
901                d[k] = '<...ELIDED...>'
902            else:
903                d[k] = cleanse_helper(v)
904    return d
905
906def recursive_glob(rootdir='.', pattern='*'):
907    return [os.path.join(rootdir, filename)
908            for rootdir, dirnames, filenames in os.walk(rootdir)
909            for filename in filenames
910            if fnmatch.fnmatch(filename, pattern)]
911
912def local_to_utc(t):
913    secs = time.mktime(t.timetuple())
914    return datetime.datetime.utcfromtimestamp(secs)
915