1import httplib2
2import json
3from tasks.taskmanager import TaskManager
4from tasks.task import *
5
6class BLEVE:
7    STOPWORDS = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves',
8                 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him',
9                 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its',
10                 'itself', 'they', 'them', 'their', 'theirs', 'themselves',
11                 'what', 'which', 'who', 'whom', 'this', 'that', 'these',
12                 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been',
13                 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did',
14                 'doing', 'would', 'should', 'could', 'ought', "i'm", "you're",
15                 "he's", "she's", "it's", "we're", "they're", "i've", "you've",
16                 "we've", "they've", "i'd", "you'd", "he'd", "she'd", "we'd",
17                 "they'd", "i'll", "you'll", "he'll", "she'll", "we'll",
18                 "they'll", "isn't", "aren't", "wasn't", "weren't", "hasn't",
19                 "haven't", "hadn't", "doesn't", "don't", "didn't", "won't",
20                 "wouldn't", "shan't", "shouldn't", "can't", 'cannot',
21                 "couldn't", "mustn't", "let's", "that's", "who's", "what's",
22                 "here's", "there's", "when's", "where's", "why's", "how's",
23                 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as',
24                 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about',
25                 'against', 'between', 'into', 'through', 'during', 'before',
26                 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in',
27                 'out', 'on', 'off', 'over', 'under', 'again', 'further',
28                 'then', 'once', 'here', 'there', 'when', 'where', 'why',
29                 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most',
30                 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own',
31                 'same', 'so', 'than', 'too', 'very']
32
33    STD_ANALYZER = {
34        "settings": {
35            "analysis": {
36                "analyzer": {
37                    "default": {
38                        "type":      "standard",
39                        "stopwords": STOPWORDS
40                    }
41                }
42            }
43        }
44    }
45
46    CUSTOM_ANALYZER = {
47        "settings": {
48            "analysis": {
49                "analyzer": {
50                },
51                "char_filter": {
52                    "mapping": {
53                        "type": "mapping",
54                        "mappings": [
55                            "f => ph"
56                        ]
57                    }
58                },
59                "tokenizer":{
60                    "alphanumeric":{
61                        "type":"pattern",
62                        "pattern":"[^a-zA-Z0-9_]"
63                    }
64                },
65                "filter": {
66                    "back_edge_ngram": {
67                        "type":"edgeNGram",
68                        "min_gram":3,
69                        "max_gram":5,
70                        "side":"back"
71                    },
72                    "front_edge_ngram": {
73                        "type": "edgeNGram",
74                        "min_gram": 3,
75                        "max_gram": 5,
76                        "side": "front"
77                    },
78                    "ngram": {
79                        "type": "nGram",
80                        "min_gram": 3,
81                        "max_gram": 5,
82                        "side": "front"
83                    },
84                    "keyword_marker": {
85                        "type":"keyword_marker",
86                        "keywords":STOPWORDS
87                    },
88                    "stopwords": {
89                        "type":"stop",
90                        "stopwords":STOPWORDS
91                    },
92                    "length": {
93                        "type":"length",
94                        "min":3,
95                        "max":5
96                    },
97                    "shingle": {
98                        "type":"shingle",
99                        "max_shingle_size":5,
100                        "min_shingle_size":2,
101                        "output_unigrams":"false",
102                        "output_unigrams_if_no_shingles":"false",
103                        "token_separator":"",
104                        "filler_token":""
105                    },
106                    "truncate": {
107                        "length": 10,
108                        "type": "truncate"
109                    },
110                    "cjk_bigram": {
111                        "type": "cjk_bigram"
112                    },
113                    "stemmer_it_light": {
114                        "type": "stemmer",
115                        "name": "light_italian"
116                    },
117                    "stemmer_fr_light": {
118                        "type": "stemmer",
119                        "name": "light_french"
120                    },
121                    "stemmer_fr_min": {
122                        "type": "stemmer",
123                        "name": "minimal_french"
124                    },
125                    "stemmer_pt_light": {
126                        "type": "stemmer",
127                        "name": "light_portuguese"
128                    }
129                }
130            }
131        }
132    }
133
134    FTS_ES_ANALYZER_MAPPING = {
135        "char_filters" : {
136            "html":"html_strip",
137            "zero_width_spaces":"html_strip",
138            "mapping":"mapping"
139        },
140        "token_filters": {
141            "apostrophe":"apostrophe",
142            "elision_fr":"elision",
143            "to_lower":"lowercase",
144            "ngram":"ngram",
145            "back_edge_ngram":"back_edge_ngram",
146            "front_edge_ngram": "front_edge_ngram",
147            "length":"length",
148            "shingle":"shingle",
149            "stemmer_porter":"porter_stem",
150            "truncate":"truncate",
151            "keyword_marker":"keyword_marker",
152            "stopwords":"stopwords",
153            "cjk_width":"cjk_width",
154            "cjk_bigram":"cjk_bigram",
155            "stemmer_it_light":"stemmer_it_light",
156            "stemmer_fr_light":"stemmer_fr_light",
157            "stemmer_fr_min": "stemmer_fr_min",
158            "stemmer_pt_light": "stemmer_pt_light"
159        },
160        "tokenizers": {
161            "letter":"letter",
162            "web":"uax_url_email",
163            "whitespace":"whitespace",
164            "unicode":"standard",
165            "single":"keyword",
166            "alphanumeric":"alphanumeric"
167        }
168    }
169
170class ElasticSearchBase(object):
171
172    def __init__(self, host, logger):
173        #host is in the form IP address
174        self.__log = logger
175        self.__host = host
176        self.__document = {}
177        self.__mapping = {}
178        self.__STATUSOK = 200
179        self.__indices = []
180        self.__index_types = {}
181        self.__connection_url = 'http://{0}:{1}/'.format(self.__host.ip,
182                                                        self.__host.port)
183        self.es_queries = []
184        self.task_manager = TaskManager("ES_Thread")
185        self.task_manager.start()
186        self.http = httplib2.Http
187
188    def _http_request(self, api, method='GET', params='', headers=None,
189                      timeout=30):
190        if not headers:
191            headers = {'Content-Type': 'application/json',
192                       'Accept': '*/*'}
193        try:
194            response, content = httplib2.Http(timeout=timeout).request(api,
195                                                                       method,
196                                                                       params,
197                                                                       headers)
198            if response['status'] in ['200', '201', '202']:
199                return True, content, response
200            else:
201                try:
202                    json_parsed = json.loads(content)
203                except ValueError as e:
204                    json_parsed = {}
205                    json_parsed["error"] = "status: {0}, content: {1}".\
206                        format(response['status'], content)
207                reason = "unknown"
208                if "error" in json_parsed:
209                    reason = json_parsed["error"]
210                self.__log.error('{0} error {1} reason: {2} {3}'.format(
211                    api,
212                    response['status'],
213                    reason,
214                    content.rstrip('\n')))
215                return False, content, response
216        except socket.error as e:
217            self.__log.error("socket error while connecting to {0} error {1} ".
218                             format(api, e))
219            raise ServerUnavailableException(ip=self.__host.ip)
220
221    def is_running(self):
222        """
223         make sure ES is up and running
224         check the service is running , if not abort the test
225        """
226
227        try:
228            status, content, _ = self._http_request(
229                self.__connection_url,
230                'GET')
231            if status:
232                return True
233            else:
234                return False
235        except Exception as e:
236            raise e
237
238    def delete_index(self, index_name):
239        """
240        Deletes index
241        """
242        try:
243            url = self.__connection_url + index_name
244            status, content, _ = self._http_request(url, 'DELETE')
245        except Exception as e:
246            raise e
247
248    def delete_indices(self):
249        """
250        Delete all indices present
251        """
252        for index_name in self.__indices:
253            self.delete_index(index_name)
254            self.__log.info("ES index %s deleted" % index_name)
255
256    def create_empty_index(self, index_name):
257        """
258        Creates an empty index, given the name
259        """
260        try:
261            self.delete_index(index_name)
262            status, content, _ = self._http_request(
263                self.__connection_url + index_name,
264                'PUT')
265            if status:
266                self.__indices.append(index_name)
267        except Exception as e:
268            raise Exception("Could not create ES index : %s" % e)
269
270    def create_empty_index_with_bleve_equivalent_std_analyzer(self, index_name):
271        """
272        Refer:
273        https://www.elastic.co/guide/en/elasticsearch/guide/current/
274        configuring-analyzers.html
275        """
276        try:
277            self.delete_index(index_name)
278            status, content, _ = self._http_request(
279                self.__connection_url + index_name,
280                'PUT', json.dumps(BLEVE.STD_ANALYZER))
281            if status:
282                self.__indices.append(index_name)
283        except Exception as e:
284            raise Exception("Could not create index with ES std analyzer : %s"
285                            % e)
286
287    def create_index_mapping(self, index_name, es_mapping, fts_mapping=None):
288        """
289        Creates a new default index, with the given mapping
290        """
291        self.delete_index(index_name)
292
293        if not fts_mapping:
294            map = {"mappings": es_mapping, "settings": BLEVE.STD_ANALYZER['settings']}
295        else :
296            # Find the ES equivalent char_filter, token_filter and tokenizer
297            es_settings = self.populate_es_settings(fts_mapping['params']
298                                                    ['mapping']['analysis']['analyzers'])
299
300            # Create an ES custom index definition
301            map = {"mappings": es_mapping, "settings": es_settings['settings']}
302
303        # Create ES index
304        try:
305            self.__log.info("Creating %s with mapping %s"
306                            % (index_name, json.dumps(map, indent=3)))
307            status, content, _ = self._http_request(
308                self.__connection_url + index_name,
309                'PUT',
310                json.dumps(map))
311            if status:
312                self.__log.info("SUCCESS: ES index created with above mapping")
313            else:
314                raise Exception("Could not create ES index")
315        except Exception as e:
316            raise Exception("Could not create ES index : %s" % e)
317
318    def populate_es_settings(self, fts_custom_analyzers_def):
319        """
320        Populates the custom analyzer defintion of the ES Index Definition.
321        Refers to the FTS Custom Analyzers definition and creates an
322            equivalent definition for each ES custom analyzer
323        :param fts_custom_analyzers_def: FTS Custom Analyzer Definition
324        :return:
325        """
326
327        num_custom_analyzers = len(fts_custom_analyzers_def)
328        n = 1
329        analyzer_map = {}
330        while n <= num_custom_analyzers:
331            customAnalyzerName = fts_custom_analyzers_def.keys()[n-1]
332            fts_char_filters = fts_custom_analyzers_def[customAnalyzerName]["char_filters"]
333            fts_tokenizer = fts_custom_analyzers_def[customAnalyzerName]["tokenizer"]
334            fts_token_filters = fts_custom_analyzers_def[customAnalyzerName]["token_filters"]
335
336            analyzer_map[customAnalyzerName] = {}
337            analyzer_map[customAnalyzerName]["char_filter"] = []
338            analyzer_map[customAnalyzerName]["filter"] = []
339            analyzer_map[customAnalyzerName]["tokenizer"] = ""
340
341            for fts_char_filter in fts_char_filters:
342                analyzer_map[customAnalyzerName]['char_filter'].append( \
343                    BLEVE.FTS_ES_ANALYZER_MAPPING['char_filters'][fts_char_filter])
344
345            analyzer_map[customAnalyzerName]['tokenizer'] = \
346                BLEVE.FTS_ES_ANALYZER_MAPPING['tokenizers'][fts_tokenizer]
347
348            for fts_token_filter in fts_token_filters:
349                analyzer_map[customAnalyzerName]['filter'].append( \
350                    BLEVE.FTS_ES_ANALYZER_MAPPING['token_filters'][fts_token_filter])
351
352            n += 1
353
354        analyzer = BLEVE.CUSTOM_ANALYZER
355        analyzer['settings']['analysis']['analyzer'] = analyzer_map
356        return analyzer
357
358    def create_alias(self, name, indexes):
359        """
360        @name: alias name
361        @indexes: list of target indexes
362        """
363        try:
364            self.__log.info("Checking if ES alias '{0}' exists...".format(name))
365            self.delete_index(name)
366            alias_info = {"actions": []}
367            for index in indexes:
368                alias_info['actions'].append({"add": {"index": index,
369                                                      "alias": name}})
370            self.__log.info("Creating ES alias '{0}' on {1}...".format(
371                name,
372                indexes))
373            status, content, _ = self._http_request(
374                self.__connection_url + "_aliases",
375                'POST',
376                json.dumps(alias_info))
377            if status:
378                self.__log.info("ES alias '{0}' created".format(name))
379                self.__indices.append(name)
380        except Exception as ex:
381            raise Exception("Could not create ES alias : %s" % ex)
382
383    def async_load_ES(self, index_name, gen, op_type='create'):
384        """
385        Asynchronously run query against FTS and ES and compare result
386        note: every task runs a single query
387        """
388
389        _task = ESLoadGeneratorTask(es_instance=self,
390                                    index_name=index_name,
391                                    generator=gen,
392                                    op_type=op_type)
393        self.task_manager.schedule(_task)
394        return _task
395
396    def async_bulk_load_ES(self, index_name, gen, op_type='create', batch=5000):
397        _task = ESBulkLoadGeneratorTask(es_instance=self,
398                                    index_name=index_name,
399                                    generator=gen,
400                                    op_type=op_type,
401                                    batch=batch)
402        self.task_manager.schedule(_task)
403        return _task
404
405    def load_bulk_data(self, filename):
406        """
407        Bulk load to ES from a file
408        curl -s -XPOST 172.23.105.25:9200/_bulk --data-binary @req
409        cat req:
410        { "index" : { "_index" : "default_es_index", "_type" : "aruna", "_id" : "1" } }
411        { "field1" : "value1" , "field2" : "value2"}
412        { "index" : { "_index" : "default_es_index", "_type" : "aruna", "_id" : "2" } }
413        { "field1" : "value1" , "field2" : "value2"}
414        """
415        try:
416            import os
417            url = self.__connection_url + "/_bulk"
418            data = open(filename, "rb").read()
419            status, content, _ = self._http_request(url,
420                                                    'POST',
421                                                    data)
422            return status
423        except Exception as e:
424            raise e
425
426    def load_data(self, index_name, document_json, doc_type, doc_id):
427        """
428        index_name : name of index into which the doc is loaded
429        document_json: json doc
430        doc_type : type of doc. Usually the '_type' field in the doc body
431        doc_id : document id
432        """
433        try:
434            url = self.__connection_url + index_name + '/' + doc_type + '/' +\
435                  doc_id
436            status, content, _ = self._http_request(url,
437                                                    'POST',
438                                                    document_json)
439        except Exception as e:
440            raise e
441
442    def update_index(self, index_name):
443        """
444        This procedure will refresh index when insert is performed .
445        Need to call this API to take search in effect.
446        :param index_name:
447        :return:
448        """
449        try:
450            status, content, _ = self._http_request(
451                self.__connection_url + index_name +'/_refresh',
452                'POST')
453        except Exception as e:
454            raise e
455
456    def search(self, index_name, query, result_size=1000000):
457        """
458           This function will be used for search . based on the query
459           :param index_name:
460           :param query:
461           :return: number of matches found, doc_ids and time taken
462        """
463        try:
464            doc_ids = []
465            url = self.__connection_url + index_name + '/_search?size='+ \
466                  str(result_size)
467            status, content, _ = self._http_request(
468                url,
469                'POST',
470                json.dumps(query))
471            if status:
472                content = json.loads(content)
473                for doc in content['hits']['hits']:
474                    doc_ids.append(doc['_id'])
475                return content['hits']['total'], doc_ids, content['took']
476        except Exception as e:
477            self.__log.error("Couldn't run query on ES: %s, reason : %s"
478                             % (json.dumps(query), e))
479            raise e
480
481    def get_index_count(self, index_name):
482        """
483         Returns count of docs in the index
484        """
485        try:
486            status, content, _ = self._http_request(
487                self.__connection_url + index_name + '/_count',
488                'POST')
489            if status:
490                return json.loads(content)['count']
491        except Exception as e:
492            raise e
493
494    def get_indices(self):
495        """
496        Return all the indices created
497        :return: List of all indices
498        """
499        return self.__indices
500