1import httplib2 2import json 3from tasks.taskmanager import TaskManager 4from tasks.task import * 5 6class BLEVE: 7 STOPWORDS = ['i', 'me', 'my', 'myself', 'we', 'our', 'ours', 'ourselves', 8 'you', 'your', 'yours', 'yourself', 'yourselves', 'he', 'him', 9 'his', 'himself', 'she', 'her', 'hers', 'herself', 'it', 'its', 10 'itself', 'they', 'them', 'their', 'theirs', 'themselves', 11 'what', 'which', 'who', 'whom', 'this', 'that', 'these', 12 'those', 'am', 'is', 'are', 'was', 'were', 'be', 'been', 13 'being', 'have', 'has', 'had', 'having', 'do', 'does', 'did', 14 'doing', 'would', 'should', 'could', 'ought', "i'm", "you're", 15 "he's", "she's", "it's", "we're", "they're", "i've", "you've", 16 "we've", "they've", "i'd", "you'd", "he'd", "she'd", "we'd", 17 "they'd", "i'll", "you'll", "he'll", "she'll", "we'll", 18 "they'll", "isn't", "aren't", "wasn't", "weren't", "hasn't", 19 "haven't", "hadn't", "doesn't", "don't", "didn't", "won't", 20 "wouldn't", "shan't", "shouldn't", "can't", 'cannot', 21 "couldn't", "mustn't", "let's", "that's", "who's", "what's", 22 "here's", "there's", "when's", "where's", "why's", "how's", 23 'a', 'an', 'the', 'and', 'but', 'if', 'or', 'because', 'as', 24 'until', 'while', 'of', 'at', 'by', 'for', 'with', 'about', 25 'against', 'between', 'into', 'through', 'during', 'before', 26 'after', 'above', 'below', 'to', 'from', 'up', 'down', 'in', 27 'out', 'on', 'off', 'over', 'under', 'again', 'further', 28 'then', 'once', 'here', 'there', 'when', 'where', 'why', 29 'how', 'all', 'any', 'both', 'each', 'few', 'more', 'most', 30 'other', 'some', 'such', 'no', 'nor', 'not', 'only', 'own', 31 'same', 'so', 'than', 'too', 'very'] 32 33 STD_ANALYZER = { 34 "settings": { 35 "analysis": { 36 "analyzer": { 37 "default": { 38 "type": "standard", 39 "stopwords": STOPWORDS 40 } 41 } 42 } 43 } 44 } 45 46 CUSTOM_ANALYZER = { 47 "settings": { 48 "analysis": { 49 "analyzer": { 50 }, 51 "char_filter": { 52 "mapping": { 53 "type": "mapping", 54 "mappings": [ 55 "f => ph" 56 ] 57 } 58 }, 59 "tokenizer":{ 60 "alphanumeric":{ 61 "type":"pattern", 62 "pattern":"[^a-zA-Z0-9_]" 63 } 64 }, 65 "filter": { 66 "back_edge_ngram": { 67 "type":"edgeNGram", 68 "min_gram":3, 69 "max_gram":5, 70 "side":"back" 71 }, 72 "front_edge_ngram": { 73 "type": "edgeNGram", 74 "min_gram": 3, 75 "max_gram": 5, 76 "side": "front" 77 }, 78 "ngram": { 79 "type": "nGram", 80 "min_gram": 3, 81 "max_gram": 5, 82 "side": "front" 83 }, 84 "keyword_marker": { 85 "type":"keyword_marker", 86 "keywords":STOPWORDS 87 }, 88 "stopwords": { 89 "type":"stop", 90 "stopwords":STOPWORDS 91 }, 92 "length": { 93 "type":"length", 94 "min":3, 95 "max":5 96 }, 97 "shingle": { 98 "type":"shingle", 99 "max_shingle_size":5, 100 "min_shingle_size":2, 101 "output_unigrams":"false", 102 "output_unigrams_if_no_shingles":"false", 103 "token_separator":"", 104 "filler_token":"" 105 }, 106 "truncate": { 107 "length": 10, 108 "type": "truncate" 109 }, 110 "cjk_bigram": { 111 "type": "cjk_bigram" 112 }, 113 "stemmer_it_light": { 114 "type": "stemmer", 115 "name": "light_italian" 116 }, 117 "stemmer_fr_light": { 118 "type": "stemmer", 119 "name": "light_french" 120 }, 121 "stemmer_fr_min": { 122 "type": "stemmer", 123 "name": "minimal_french" 124 }, 125 "stemmer_pt_light": { 126 "type": "stemmer", 127 "name": "light_portuguese" 128 } 129 } 130 } 131 } 132 } 133 134 FTS_ES_ANALYZER_MAPPING = { 135 "char_filters" : { 136 "html":"html_strip", 137 "zero_width_spaces":"html_strip", 138 "mapping":"mapping" 139 }, 140 "token_filters": { 141 "apostrophe":"apostrophe", 142 "elision_fr":"elision", 143 "to_lower":"lowercase", 144 "ngram":"ngram", 145 "back_edge_ngram":"back_edge_ngram", 146 "front_edge_ngram": "front_edge_ngram", 147 "length":"length", 148 "shingle":"shingle", 149 "stemmer_porter":"porter_stem", 150 "truncate":"truncate", 151 "keyword_marker":"keyword_marker", 152 "stopwords":"stopwords", 153 "cjk_width":"cjk_width", 154 "cjk_bigram":"cjk_bigram", 155 "stemmer_it_light":"stemmer_it_light", 156 "stemmer_fr_light":"stemmer_fr_light", 157 "stemmer_fr_min": "stemmer_fr_min", 158 "stemmer_pt_light": "stemmer_pt_light" 159 }, 160 "tokenizers": { 161 "letter":"letter", 162 "web":"uax_url_email", 163 "whitespace":"whitespace", 164 "unicode":"standard", 165 "single":"keyword", 166 "alphanumeric":"alphanumeric" 167 } 168 } 169 170class ElasticSearchBase(object): 171 172 def __init__(self, host, logger): 173 #host is in the form IP address 174 self.__log = logger 175 self.__host = host 176 self.__document = {} 177 self.__mapping = {} 178 self.__STATUSOK = 200 179 self.__indices = [] 180 self.__index_types = {} 181 self.__connection_url = 'http://{0}:{1}/'.format(self.__host.ip, 182 self.__host.port) 183 self.es_queries = [] 184 self.task_manager = TaskManager("ES_Thread") 185 self.task_manager.start() 186 self.http = httplib2.Http 187 188 def _http_request(self, api, method='GET', params='', headers=None, 189 timeout=30): 190 if not headers: 191 headers = {'Content-Type': 'application/json', 192 'Accept': '*/*'} 193 try: 194 response, content = httplib2.Http(timeout=timeout).request(api, 195 method, 196 params, 197 headers) 198 if response['status'] in ['200', '201', '202']: 199 return True, content, response 200 else: 201 try: 202 json_parsed = json.loads(content) 203 except ValueError as e: 204 json_parsed = {} 205 json_parsed["error"] = "status: {0}, content: {1}".\ 206 format(response['status'], content) 207 reason = "unknown" 208 if "error" in json_parsed: 209 reason = json_parsed["error"] 210 self.__log.error('{0} error {1} reason: {2} {3}'.format( 211 api, 212 response['status'], 213 reason, 214 content.rstrip('\n'))) 215 return False, content, response 216 except socket.error as e: 217 self.__log.error("socket error while connecting to {0} error {1} ". 218 format(api, e)) 219 raise ServerUnavailableException(ip=self.__host.ip) 220 221 def is_running(self): 222 """ 223 make sure ES is up and running 224 check the service is running , if not abort the test 225 """ 226 227 try: 228 status, content, _ = self._http_request( 229 self.__connection_url, 230 'GET') 231 if status: 232 return True 233 else: 234 return False 235 except Exception as e: 236 raise e 237 238 def delete_index(self, index_name): 239 """ 240 Deletes index 241 """ 242 try: 243 url = self.__connection_url + index_name 244 status, content, _ = self._http_request(url, 'DELETE') 245 except Exception as e: 246 raise e 247 248 def delete_indices(self): 249 """ 250 Delete all indices present 251 """ 252 for index_name in self.__indices: 253 self.delete_index(index_name) 254 self.__log.info("ES index %s deleted" % index_name) 255 256 def create_empty_index(self, index_name): 257 """ 258 Creates an empty index, given the name 259 """ 260 try: 261 self.delete_index(index_name) 262 status, content, _ = self._http_request( 263 self.__connection_url + index_name, 264 'PUT') 265 if status: 266 self.__indices.append(index_name) 267 except Exception as e: 268 raise Exception("Could not create ES index : %s" % e) 269 270 def create_empty_index_with_bleve_equivalent_std_analyzer(self, index_name): 271 """ 272 Refer: 273 https://www.elastic.co/guide/en/elasticsearch/guide/current/ 274 configuring-analyzers.html 275 """ 276 try: 277 self.delete_index(index_name) 278 status, content, _ = self._http_request( 279 self.__connection_url + index_name, 280 'PUT', json.dumps(BLEVE.STD_ANALYZER)) 281 if status: 282 self.__indices.append(index_name) 283 except Exception as e: 284 raise Exception("Could not create index with ES std analyzer : %s" 285 % e) 286 287 def create_index_mapping(self, index_name, es_mapping, fts_mapping=None): 288 """ 289 Creates a new default index, with the given mapping 290 """ 291 self.delete_index(index_name) 292 293 if not fts_mapping: 294 map = {"mappings": es_mapping, "settings": BLEVE.STD_ANALYZER['settings']} 295 else : 296 # Find the ES equivalent char_filter, token_filter and tokenizer 297 es_settings = self.populate_es_settings(fts_mapping['params'] 298 ['mapping']['analysis']['analyzers']) 299 300 # Create an ES custom index definition 301 map = {"mappings": es_mapping, "settings": es_settings['settings']} 302 303 # Create ES index 304 try: 305 self.__log.info("Creating %s with mapping %s" 306 % (index_name, json.dumps(map, indent=3))) 307 status, content, _ = self._http_request( 308 self.__connection_url + index_name, 309 'PUT', 310 json.dumps(map)) 311 if status: 312 self.__log.info("SUCCESS: ES index created with above mapping") 313 else: 314 raise Exception("Could not create ES index") 315 except Exception as e: 316 raise Exception("Could not create ES index : %s" % e) 317 318 def populate_es_settings(self, fts_custom_analyzers_def): 319 """ 320 Populates the custom analyzer defintion of the ES Index Definition. 321 Refers to the FTS Custom Analyzers definition and creates an 322 equivalent definition for each ES custom analyzer 323 :param fts_custom_analyzers_def: FTS Custom Analyzer Definition 324 :return: 325 """ 326 327 num_custom_analyzers = len(fts_custom_analyzers_def) 328 n = 1 329 analyzer_map = {} 330 while n <= num_custom_analyzers: 331 customAnalyzerName = fts_custom_analyzers_def.keys()[n-1] 332 fts_char_filters = fts_custom_analyzers_def[customAnalyzerName]["char_filters"] 333 fts_tokenizer = fts_custom_analyzers_def[customAnalyzerName]["tokenizer"] 334 fts_token_filters = fts_custom_analyzers_def[customAnalyzerName]["token_filters"] 335 336 analyzer_map[customAnalyzerName] = {} 337 analyzer_map[customAnalyzerName]["char_filter"] = [] 338 analyzer_map[customAnalyzerName]["filter"] = [] 339 analyzer_map[customAnalyzerName]["tokenizer"] = "" 340 341 for fts_char_filter in fts_char_filters: 342 analyzer_map[customAnalyzerName]['char_filter'].append( \ 343 BLEVE.FTS_ES_ANALYZER_MAPPING['char_filters'][fts_char_filter]) 344 345 analyzer_map[customAnalyzerName]['tokenizer'] = \ 346 BLEVE.FTS_ES_ANALYZER_MAPPING['tokenizers'][fts_tokenizer] 347 348 for fts_token_filter in fts_token_filters: 349 analyzer_map[customAnalyzerName]['filter'].append( \ 350 BLEVE.FTS_ES_ANALYZER_MAPPING['token_filters'][fts_token_filter]) 351 352 n += 1 353 354 analyzer = BLEVE.CUSTOM_ANALYZER 355 analyzer['settings']['analysis']['analyzer'] = analyzer_map 356 return analyzer 357 358 def create_alias(self, name, indexes): 359 """ 360 @name: alias name 361 @indexes: list of target indexes 362 """ 363 try: 364 self.__log.info("Checking if ES alias '{0}' exists...".format(name)) 365 self.delete_index(name) 366 alias_info = {"actions": []} 367 for index in indexes: 368 alias_info['actions'].append({"add": {"index": index, 369 "alias": name}}) 370 self.__log.info("Creating ES alias '{0}' on {1}...".format( 371 name, 372 indexes)) 373 status, content, _ = self._http_request( 374 self.__connection_url + "_aliases", 375 'POST', 376 json.dumps(alias_info)) 377 if status: 378 self.__log.info("ES alias '{0}' created".format(name)) 379 self.__indices.append(name) 380 except Exception as ex: 381 raise Exception("Could not create ES alias : %s" % ex) 382 383 def async_load_ES(self, index_name, gen, op_type='create'): 384 """ 385 Asynchronously run query against FTS and ES and compare result 386 note: every task runs a single query 387 """ 388 389 _task = ESLoadGeneratorTask(es_instance=self, 390 index_name=index_name, 391 generator=gen, 392 op_type=op_type) 393 self.task_manager.schedule(_task) 394 return _task 395 396 def async_bulk_load_ES(self, index_name, gen, op_type='create', batch=5000): 397 _task = ESBulkLoadGeneratorTask(es_instance=self, 398 index_name=index_name, 399 generator=gen, 400 op_type=op_type, 401 batch=batch) 402 self.task_manager.schedule(_task) 403 return _task 404 405 def load_bulk_data(self, filename): 406 """ 407 Bulk load to ES from a file 408 curl -s -XPOST 172.23.105.25:9200/_bulk --data-binary @req 409 cat req: 410 { "index" : { "_index" : "default_es_index", "_type" : "aruna", "_id" : "1" } } 411 { "field1" : "value1" , "field2" : "value2"} 412 { "index" : { "_index" : "default_es_index", "_type" : "aruna", "_id" : "2" } } 413 { "field1" : "value1" , "field2" : "value2"} 414 """ 415 try: 416 import os 417 url = self.__connection_url + "/_bulk" 418 data = open(filename, "rb").read() 419 status, content, _ = self._http_request(url, 420 'POST', 421 data) 422 return status 423 except Exception as e: 424 raise e 425 426 def load_data(self, index_name, document_json, doc_type, doc_id): 427 """ 428 index_name : name of index into which the doc is loaded 429 document_json: json doc 430 doc_type : type of doc. Usually the '_type' field in the doc body 431 doc_id : document id 432 """ 433 try: 434 url = self.__connection_url + index_name + '/' + doc_type + '/' +\ 435 doc_id 436 status, content, _ = self._http_request(url, 437 'POST', 438 document_json) 439 except Exception as e: 440 raise e 441 442 def update_index(self, index_name): 443 """ 444 This procedure will refresh index when insert is performed . 445 Need to call this API to take search in effect. 446 :param index_name: 447 :return: 448 """ 449 try: 450 status, content, _ = self._http_request( 451 self.__connection_url + index_name +'/_refresh', 452 'POST') 453 except Exception as e: 454 raise e 455 456 def search(self, index_name, query, result_size=1000000): 457 """ 458 This function will be used for search . based on the query 459 :param index_name: 460 :param query: 461 :return: number of matches found, doc_ids and time taken 462 """ 463 try: 464 doc_ids = [] 465 url = self.__connection_url + index_name + '/_search?size='+ \ 466 str(result_size) 467 status, content, _ = self._http_request( 468 url, 469 'POST', 470 json.dumps(query)) 471 if status: 472 content = json.loads(content) 473 for doc in content['hits']['hits']: 474 doc_ids.append(doc['_id']) 475 return content['hits']['total'], doc_ids, content['took'] 476 except Exception as e: 477 self.__log.error("Couldn't run query on ES: %s, reason : %s" 478 % (json.dumps(query), e)) 479 raise e 480 481 def get_index_count(self, index_name): 482 """ 483 Returns count of docs in the index 484 """ 485 try: 486 status, content, _ = self._http_request( 487 self.__connection_url + index_name + '/_count', 488 'POST') 489 if status: 490 return json.loads(content)['count'] 491 except Exception as e: 492 raise e 493 494 def get_indices(self): 495 """ 496 Return all the indices created 497 :return: List of all indices 498 """ 499 return self.__indices 500