xref: /4.6.0/couchbase-cli/pump_json.py (revision 4cf7b099)
1#!/usr/bin/env python
2
3import logging
4import os
5import json
6import struct
7import sys
8import shutil
9import tempfile
10import zipfile
11
12import couchbaseConstants
13import pump
14
15JSON_SCHEME = "json://"
16
17class JSONSource(pump.Source):
18    """Reads json file or directory or zip file that contains json files."""
19
20    def __init__(self, opts, spec, source_bucket, source_node,
21                 source_map, sink_map, ctl, cur):
22        super(JSONSource, self).__init__(opts, spec, source_bucket, source_node,
23                                         source_map, sink_map, ctl, cur)
24        self.done = False
25        self.docs = list()
26        self.file_iter = None
27
28    @staticmethod
29    def can_handle(opts, spec):
30        return spec.startswith(JSON_SCHEME) and \
31            (os.path.isfile(spec.replace(JSON_SCHEME, "")) or \
32             os.path.isdir(spec.replace(JSON_SCHEME, "")) or \
33             spec.endswith(".zip"))
34
35    @staticmethod
36    def check(opts, spec):
37
38        return 0, {'spec': spec,
39                   'buckets': [{'name': os.path.normpath(os.path.basename(spec)),
40                                'nodes': [{'hostname': 'N/A'}]}]}
41
42    def save_doc(self, batch, dockey, docvalue):
43        cmd = couchbaseConstants.CMD_TAP_MUTATION
44        vbucket_id = 0x0000ffff
45        # common flags:       0x02000000 (JSON)
46        # legacy flags:       0x00000006 (JSON)
47        cas, exp, flg = 0, 0, 0x02000006
48        try:
49            doc = json.loads(docvalue)
50            if '_id' not in doc:
51                msg = (cmd, vbucket_id, dockey, flg, exp, cas, '', docvalue, 0, 0, 0, 0)
52                batch.append(msg, len(docvalue))
53            else:
54                id = doc['_id'].encode('UTF-8')
55                del doc['_id']
56                docdata = {"doc":{
57                    "json": doc,
58                    "meta":{"id":id}
59                }}
60                if not is_data:
61                    batch.append(json.dumps(docdata), len(docdata))
62        except ValueError, error:
63            logging.error("Fail to read json file with error:" + str(error))
64
65    @staticmethod
66    def gen_dockey(filename):
67        return os.path.splitext(os.path.basename(filename))[0]
68
69    @staticmethod
70    def enumerate_files(subdir, file_candidate, skip_views, skip_docs):
71        for item in os.listdir(subdir):
72            path = os.path.join(subdir, item)
73            if os.path.isfile(path):
74                if (not skip_views and "design_docs" in path.split(os.path.sep)) or \
75                   (not skip_docs and "docs" in path.split(os.path.sep)):
76                    file_candidate.append(path)
77            else:
78                if not ((skip_docs and "docs" in path.split(os.path.sep)) or \
79                   (skip_views and "design_docs" in path.split(os.path.sep))):
80                    JSONSource.enumerate_files(path, file_candidate, skip_views, skip_docs)
81
82    @staticmethod
83    def provide_design(opts, source_spec, source_bucket, source_map):
84        design_files = list()
85        f = source_spec.replace(JSON_SCHEME, "")
86
87        if os.path.isfile(f) and f.endswith(".zip"):
88            zf = zipfile.ZipFile(f)
89            for path in zf.namelist():
90                file = os.path.basename(path)
91                # Skip the design_docs directory listing
92                if file == "design_docs":
93                    continue
94
95                dir = os.path.basename(os.path.dirname(path))
96                # Skip all files not in the design docs directory
97                if dir != "design_docs":
98                    continue
99
100                design_files.append(zf.read(path))
101            zf.close()
102        elif os.path.isdir(f):
103            files = list()
104            JSONSource.enumerate_files(f, files, False, True)
105            for path in files:
106                if os.path.isfile(path):
107                    f = open(path, 'r')
108                    design_files.append(f.read())
109                    f.close()
110
111        return 0, design_files
112
113    def provide_batch(self):
114        if self.done:
115            return 0, None
116
117        # During the first iteration load the file names, this is only run once
118        if not self.docs:
119            self.prepare_docs()
120
121        batch = pump.Batch(self)
122        f = self.spec.replace(JSON_SCHEME, "")
123        batch_max_size = self.opts.extra['batch_max_size']
124
125        # Each iteration should return a batch or mark the loading a finished
126        if os.path.isfile(f) and f.endswith(".zip"):
127            zf = zipfile.ZipFile(f)
128            while batch.size() < batch_max_size and self.docs:
129                path = self.docs.pop()
130                key = os.path.basename(path)
131                if key.endswith('.json'):
132                    key = key[:-5]
133                value = zf.read(path)
134                self.save_doc(batch, key, value)
135            zf.close()
136        else:
137            while batch.size() < batch_max_size and self.docs:
138                path = self.docs.pop()
139                key = os.path.basename(path)
140                if key.endswith('.json'):
141                    key = key[:-5]
142                try:
143                    fp = open(path, 'r')
144                    value = fp.read()
145                    fp.close()
146                    self.save_doc(batch, key, value)
147                except IOError, error:
148                    logging.error("Fail to load json file with error" + str(error))
149
150        if not self.docs:
151            self.done = True
152
153        return 0, batch
154
155    def prepare_docs(self):
156        f = self.spec.replace(JSON_SCHEME, "")
157        root_name = os.path.basename(f).split('.')[0]
158        if os.path.isfile(f) and f.endswith(".zip"):
159            zf = zipfile.ZipFile(f)
160            for path in zf.namelist():
161                file = os.path.basename(path)
162                # Skip the docs directory listing
163                if file == "docs":
164                    continue
165
166                dir = os.path.basename(os.path.dirname(path))
167
168                # This condition is not allowed by the spec, but we allowing it
169                # because the training team did properly follow the spec and we
170                # don't want to break their training material. Since this tool
171                # will be deprecated soon we are making an exception and
172                # allowing this.
173                if dir == "" or dir == root_name:
174                    self.docs.append(path)
175                    continue
176
177                # Skip all files not in the docs directory
178                # Note that we use a forward slash for all operating systems
179                # because this is a zip file and zip paths always use a forward
180                # slash
181                if "docs" not in path.split("/"):
182                    continue
183
184                self.docs.append(path)
185            zf.close()
186        elif os.path.isdir(f):
187            JSONSource.enumerate_files(f, self.docs, True, False)
188        else:
189            self.docs.append(f)
190