55#
66
77import requests
8+ import urllib3
89import io
910import csv
1011import json
205206 ]
206207 }
207208
209+ EMP_TEST_MAPPING = \
210+ {
211+ "mappings" : {
212+ "properties" : {
213+ "birth_date" : {
214+ "type" : "date"
215+ },
216+ "emp_no" : {
217+ "type" : "integer"
218+ },
219+ "first_name" : {
220+ "type" : "text" ,
221+ "fields" : {
222+ "keyword" : {
223+ "type" : "keyword" ,
224+ "ignore_above" : 256
225+ }
226+ }
227+ },
228+ "gender" : {
229+ "type" : "keyword"
230+ },
231+ "hire_date" : {
232+ "type" : "date"
233+ },
234+ "languages" : {
235+ "type" : "integer"
236+ },
237+ "last_name" : {
238+ "type" : "text" ,
239+ "fields" : {
240+ "keyword" : {
241+ "type" : "keyword" ,
242+ "ignore_above" : 256
243+ }
244+ }
245+ },
246+ "salary" : {
247+ "type" : "long"
248+ }
249+ }
250+ }
251+ }
252+
253+ LIB_TEST_MAPPING = \
254+ {
255+ "mappings" : {
256+ "properties" : {
257+ "author" : {
258+ "type" : "text" ,
259+ "fields" : {
260+ "keyword" : {
261+ "type" : "keyword" ,
262+ "ignore_above" : 256
263+ }
264+ }
265+ },
266+ "name" : {
267+ "type" : "text" ,
268+ "fields" : {
269+ "keyword" : {
270+ "type" : "keyword" ,
271+ "ignore_above" : 256
272+ }
273+ }
274+ },
275+ "page_count" : {
276+ "type" : "integer"
277+ },
278+ "release_date" : {
279+ "type" : "date"
280+ }
281+ }
282+ }
283+ }
208284
209285ES_DATASET_BASE_URL = "https://raw.githubusercontent.com/elastic/elasticsearch/eda31b0ac00c952a52885902be59ac429b0ca81a/x-pack/plugin/sql/qa/src/main/resources/"
210286
@@ -266,8 +342,10 @@ class TestData(object):
266342 BATTERS_INDEX = "batters"
267343 LIBRARY_FILE = "library.csv"
268344 LIBRARY_INDEX = "library"
345+ LIB_TEST_INDEX = "test_lib"
269346 EMPLOYEES_FILE = "employees.csv"
270347 EMPLOYEES_INDEX = "employees"
348+ EMP_TEST_INDEX = "test_emp"
271349 PROTO_CASE_FILE = "SqlProtocolTestCase.java"
272350
273351
@@ -292,6 +370,8 @@ class TestData(object):
292370 MODE_INDEX : "indexed"
293371 }
294372
373+ _session = None
374+
295375 def __init__ (self , es , mode = MODE_INDEX , offline_dir = None ):
296376 self ._csv_md5 = {}
297377 self ._csv_header = {}
@@ -301,6 +381,10 @@ def __init__(self, es, mode=MODE_INDEX, offline_dir=None):
301381 self ._offline_dir = offline_dir
302382 self ._mode = mode
303383
384+ self ._req = requests .Session ()
385+ self ._req .verify = False
386+ urllib3 .disable_warnings (urllib3 .exceptions .InsecureRequestWarning )
387+
304388 def _csv_to_json_docs (self , csv_text ):
305389 stream = io .StringIO (csv_text )
306390 reader = csv .reader (stream , delimiter = ',' , quotechar = '"' )
@@ -374,24 +458,11 @@ def _get_csv_as_ndjson(self, base_url, csv_name, index_name):
374458 else :
375459 assert (base_url .endswith ("/" ))
376460 url = base_url + csv_name
377- req = requests .get (url , timeout = Elasticsearch .REQ_TIMEOUT )
461+ req = self . _req .get (url , timeout = Elasticsearch .REQ_TIMEOUT )
378462 if req .status_code != 200 :
379463 raise Exception ("failed to fetch %s with code %s" % (url , req .status_code ))
380464 return self ._csv_as_ndjson (req .text , req .encoding , index_name )
381465
382-
383- def _prepare_tableau_load (self , file_name , index_name , index_template ):
384- ndjson = self ._get_csv_as_ndjson (TABLEAU_DATASET_BASE_URL , file_name , index_name )
385-
386- if self .MODE_NOINDEX < self ._mode :
387- with requests .put ("%s/_template/%s_template" % (self ._es .base_url (), index_name ),
388- json = index_template , auth = self ._es .credentials ()) as req :
389- if req .status_code != 200 :
390- raise Exception ("PUT %s template failed with code: %s (content: %s)" % (index_name ,
391- req .status_code , req .text ))
392-
393- return ndjson
394-
395466 def _post_ndjson (self , ndjsons , index_name , pipeline_name = None ):
396467 print ("Indexing data for index '%s'." % index_name )
397468 url = "%s/%s/_bulk" % (self ._es .base_url (), index_name )
@@ -400,7 +471,7 @@ def _post_ndjson(self, ndjsons, index_name, pipeline_name=None):
400471 if type (ndjsons ) is not list :
401472 ndjsons = [ndjsons ]
402473 for n in ndjsons :
403- with requests .post (url , data = n , headers = {"Content-Type" : "application/x-ndjson" },
474+ with self . _req .post (url , data = n , headers = {"Content-Type" : "application/x-ndjson" },
404475 auth = self ._es .credentials ()) as req :
405476 if req .status_code not in [200 , 201 ]:
406477 raise Exception ("bulk POST to %s failed with code: %s (content: %s)" % (index_name ,
@@ -415,7 +486,7 @@ def _wait_for_results(self, index_name):
415486 waiting_since = time .time ()
416487 while hits < MIN_INDEXED_DOCS :
417488 url = "%s/%s/_search" % (self ._es .base_url (), index_name )
418- req = requests .get (url , timeout = Elasticsearch .REQ_TIMEOUT , auth = self ._es .credentials ())
489+ req = self . _req .get (url , timeout = Elasticsearch .REQ_TIMEOUT , auth = self ._es .credentials ())
419490 if req .status_code != 200 :
420491 raise Exception ("failed to _search %s: code: %s, body: %s" % (index_name , req .status_code , req .text ))
421492 answer = json .loads (req .text )
@@ -424,32 +495,48 @@ def _wait_for_results(self, index_name):
424495 if Elasticsearch .REQ_TIMEOUT < time .time () - waiting_since :
425496 raise Exception ("index '%s' has less than %s documents indexed" % (index_name , MIN_INDEXED_DOCS ))
426497
427- def _delete_if_needed (self , index_name ):
498+ def _del_resource (self , url ):
499+ with self ._req .delete (url , timeout = Elasticsearch .REQ_TIMEOUT , auth = self ._es .credentials ()) as req :
500+ if req .status_code != 200 and req .status_code != 404 :
501+ raise Exception ("Deleting %s failed; code=%s, body: %s." % (url , req .status_code , req .text ))
502+
503+ def _delete_if_needed (self , index_name , template = False , pipeline = False ):
428504 if self ._mode != self .MODE_REINDEX :
429505 return
430- print ("Deleting any old index '%s'." % index_name );
506+ print ("Deleting any old: index '%s'." % index_name );
431507
432508 url = "%s/%s" % (self ._es .base_url (), index_name )
433- with requests .delete (url , timeout = Elasticsearch .REQ_TIMEOUT , auth = self ._es .credentials ()) as req :
434- if req .status_code != 200 and req .status_code != 404 :
435- raise Exception ("Deleting index %s failed; code=%s, body: %s." %
436- (index_name , req .status_code , req .text ))
509+ self ._del_resource (url )
510+
511+ if template :
512+ url = "%s/_template/%s" % (self ._es .base_url (), index_name )
513+ self ._del_resource (url )
514+
515+ if pipeline :
516+ url = "%s/_ingest/pipeline/%s" % (self ._es .base_url (), index_name )
517+ self ._del_resource (url )
437518
438519 def _load_tableau_sample (self , file_name , index_name , template , pipeline = None ):
439- ndjsons = self ._prepare_tableau_load (file_name , index_name , template )
520+ if self ._mode <= self .MODE_NOINDEX :
521+ return
522+ self ._delete_if_needed (index_name , True , pipeline is not None )
440523
441- if self .MODE_NOINDEX < self ._mode :
442- self ._delete_if_needed (index_name )
524+ with self ._req .put ("%s/_template/%s" % (self ._es .base_url (), index_name ),
525+ json = template , auth = self ._es .credentials ()) as req :
526+ if req .status_code != 200 :
527+ raise Exception ("PUT %s template failed with code: %s (content: %s)" % (index_name ,
528+ req .status_code , req .text ))
443529
444- if pipeline :
445- with requests . put ("%s/_ingest/pipeline/parse_ %s" % (self ._es .base_url (), index_name ),
446- json = pipeline , auth = self ._es .credentials ()) as req :
447- if req .status_code != 200 :
448- raise Exception ("PUT %s pipeline failed with code: %s (content: %s) " % (index_name ,
449- req .status_code , req .text ))
530+ if pipeline :
531+ with self . _req . put ("%s/_ingest/pipeline/%s" % (self ._es .base_url (), index_name ),
532+ json = pipeline , auth = self ._es .credentials ()) as req :
533+ if req .status_code != 200 :
534+ raise Exception ("PUT %s pipeline failed with code: %s (content: %s) " % (index_name ,
535+ req .status_code , req .text ))
450536
451- self ._post_ndjson (ndjsons , index_name , ("parse_" + index_name ) if pipeline else None )
452- self ._wait_for_results (index_name )
537+ ndjsons = self ._get_csv_as_ndjson (TABLEAU_DATASET_BASE_URL , file_name , index_name )
538+ self ._post_ndjson (ndjsons , index_name , index_name if pipeline else None )
539+ self ._wait_for_results (index_name )
453540
454541 def _load_elastic_sample (self , file_name , index_name ):
455542 ndjson = self ._get_csv_as_ndjson (ES_DATASET_BASE_URL , file_name , index_name )
@@ -458,6 +545,25 @@ def _load_elastic_sample(self, file_name, index_name):
458545 self ._post_ndjson (ndjson , index_name )
459546 self ._wait_for_results (index_name )
460547
548+ def _derive_with_mapping (self , src_index , dst_index , mapping_json ):
549+ if self ._mode < self .MODE_REINDEX :
550+ return
551+ print ("Reindexing '%s' into '%s'." % (src_index , dst_index ))
552+ self ._delete_if_needed (dst_index )
553+
554+ with self ._req .put ("%s/%s" % (self ._es .base_url (), dst_index ),
555+ json = mapping_json , auth = self ._es .credentials ()) as req :
556+ if req .status_code != 200 :
557+ raise Exception ("PUT %s mapping failed with code: %s (content: %s) " % (dst_index ,
558+ req .status_code , req .text ))
559+
560+ reindex_json = {"source" : {"index" : src_index }, "dest" : {"index" : dst_index }}
561+ with self ._req .post ("%s/_reindex?wait_for_completion=true" % self ._es .base_url (),
562+ json = reindex_json , auth = self ._es .credentials ()) as req :
563+ if req .status_code != 200 :
564+ raise Exception ("POST reindexing into %s failed with code: %s (content: %s) " % (dst_index ,
565+ req .status_code , req .text ))
566+
461567 def _get_kibana_file (self , sample_name , is_mapping = True ):
462568 print ("Fetching JS sample data for index '%s'." % sample_name )
463569 file_name = "field_mappings.js" if is_mapping else "%s.json.gz" % sample_name
@@ -468,7 +574,7 @@ def _get_kibana_file(self, sample_name, is_mapping=True):
468574 else :
469575 url = KIBANA_SAMPLES_BASE_URL + "/" + sample_name + "/"
470576 url += file_name
471- req = requests .get (url , timeout = Elasticsearch .REQ_TIMEOUT )
577+ req = self . _req .get (url , timeout = Elasticsearch .REQ_TIMEOUT )
472578 if req .status_code != 200 :
473579 raise Exception ("failed to GET URL %s for index %s with: code: %s, body: %s" %
474580 (url , sample_name , req .status_code , req .text ))
@@ -492,8 +598,8 @@ def _put_sample_template(self, sample_name, index_name):
492598 # turn it to JSON (to deal with trailing commas past last member on a level
493599 mapping = eval (mapping )
494600 # PUT the built template
495- url = "%s/_template/%s_template " % (self ._es .base_url (), index_name )
496- with requests .put (url , json = mapping , auth = self ._es .credentials (), timeout = Elasticsearch .REQ_TIMEOUT ) as req :
601+ url = "%s/_template/%s " % (self ._es .base_url (), index_name )
602+ with self . _req .put (url , json = mapping , auth = self ._es .credentials (), timeout = Elasticsearch .REQ_TIMEOUT ) as req :
497603 if req .status_code != 200 :
498604 raise Exception ("PUT %s template failed with code: %s (content: %s)" % (index_name ,
499605 req .status_code , req .text ))
@@ -522,7 +628,7 @@ def _load_proto_tests(self):
522628 case_src = f .read ()
523629 else :
524630 url = ES_PROTO_CASE_BASE_URL + "/" + self .PROTO_CASE_FILE
525- req = requests .get (url , timeout = Elasticsearch .REQ_TIMEOUT )
631+ req = self . _req .get (url , timeout = Elasticsearch .REQ_TIMEOUT )
526632 if req .status_code != 200 :
527633 raise Exception ("failed to fetch %s with code %s" % (url , req .status_code ))
528634 case_src = req .text
@@ -550,7 +656,9 @@ def load(self):
550656 self ._load_tableau_sample (self .BATTERS_FILE , self .BATTERS_INDEX , BATTERS_TEMPLATE , BATTERS_PIPELINE )
551657
552658 self ._load_elastic_sample (self .LIBRARY_FILE , self .LIBRARY_INDEX )
659+ self ._derive_with_mapping (self .LIBRARY_INDEX , self .LIB_TEST_INDEX , LIB_TEST_MAPPING )
553660 self ._load_elastic_sample (self .EMPLOYEES_FILE , self .EMPLOYEES_INDEX )
661+ self ._derive_with_mapping (self .EMPLOYEES_INDEX , self .EMP_TEST_INDEX , EMP_TEST_MAPPING )
554662
555663 self ._load_kibana_sample (self .ECOMMERCE_INDEX )
556664 self ._load_kibana_sample (self .FLIGHTS_INDEX )
0 commit comments