diff --git a/engine/clients/elasticsearch/config.py b/engine/clients/elasticsearch/config.py index 19b59d74..024ca97b 100644 --- a/engine/clients/elasticsearch/config.py +++ b/engine/clients/elasticsearch/config.py @@ -1,4 +1,52 @@ -ELASTIC_PORT = 9200 -ELASTIC_INDEX = "bench" -ELASTIC_USER = "elastic" -ELASTIC_PASSWORD = "passwd" +import os +import time + +import urllib3 +from elasticsearch import Elasticsearch + +ELASTIC_PORT = int(os.getenv("ELASTIC_PORT", 9200)) +ELASTIC_INDEX = os.getenv("ELASTIC_INDEX", "bench") +ELASTIC_USER = os.getenv("ELASTIC_USER", "elastic") +ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD", "passwd") +ELASTIC_TIMEOUT = int(os.getenv("ELASTIC_TIMEOUT", 300)) +ELASTIC_INDEX_TIMEOUT = os.getenv("ELASTIC_INDEX_TIMEOUT", "30m") +ELASTIC_INDEX_REFRESH_INTERVAL = os.getenv("ELASTIC_INDEX_REFRESH_INTERVAL", "-1") +urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + + +def get_es_client(host, connection_params): + client: Elasticsearch = None + init_params = { + **{ + "verify_certs": False, + "request_timeout": ELASTIC_TIMEOUT, + "retry_on_timeout": True, + "ssl_show_warn": False, + }, + **connection_params, + } + if host.startswith("http"): + url = "" + else: + url = "http://" + url += f"{host}:{ELASTIC_PORT}" + client = Elasticsearch( + url, + basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD), + **init_params, + ) + assert client.ping() + return client + + +def _wait_for_es_status(client, status="yellow"): + print(f"waiting for ES {status} status...") + for _ in range(100): + try: + client.cluster.health(wait_for_status=status) + return client + except ConnectionError: + time.sleep(0.1) + else: + # timeout + raise Exception("Elasticsearch failed to start.") diff --git a/engine/clients/elasticsearch/configure.py b/engine/clients/elasticsearch/configure.py index 76f64eb8..d2a09db3 100644 --- a/engine/clients/elasticsearch/configure.py +++ b/engine/clients/elasticsearch/configure.py @@ -1,4 +1,4 @@ -from elasticsearch import Elasticsearch, NotFoundError +from elasticsearch import NotFoundError from benchmark.dataset import Dataset from engine.base_client import IncompatibilityError @@ -6,9 +6,9 @@ from engine.base_client.distances import Distance from engine.clients.elasticsearch.config import ( ELASTIC_INDEX, - ELASTIC_PASSWORD, - ELASTIC_PORT, - ELASTIC_USER, + ELASTIC_INDEX_REFRESH_INTERVAL, + ELASTIC_INDEX_TIMEOUT, + get_es_client, ) @@ -25,27 +25,19 @@ class ElasticConfigurator(BaseConfigurator): def __init__(self, host, collection_params: dict, connection_params: dict): super().__init__(host, collection_params, connection_params) - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - self.client = Elasticsearch( - f"http://{host}:{ELASTIC_PORT}", - basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD), - **init_params, - ) + self.client = get_es_client(host, connection_params) def clean(self): + print("Ensuring the index does not exist...") try: self.client.indices.delete( - index=ELASTIC_INDEX, timeout="5m", master_timeout="5m" + index=ELASTIC_INDEX, + timeout=ELASTIC_INDEX_TIMEOUT, + master_timeout=ELASTIC_INDEX_TIMEOUT, ) except NotFoundError: pass + print("Finished ensuring the index does not exist...") def recreate(self, dataset: Dataset, collection_params): if dataset.config.distance == Distance.DOT: @@ -56,11 +48,14 @@ def recreate(self, dataset: Dataset, collection_params): self.client.indices.create( index=ELASTIC_INDEX, + timeout=ELASTIC_INDEX_TIMEOUT, + master_timeout=ELASTIC_INDEX_TIMEOUT, + wait_for_active_shards="all", settings={ "index": { "number_of_shards": 1, "number_of_replicas": 0, - "refresh_interval": -1, + "refresh_interval": ELASTIC_INDEX_REFRESH_INTERVAL, } }, mappings={ diff --git a/engine/clients/elasticsearch/search.py b/engine/clients/elasticsearch/search.py index 29d20ec5..b7b09e2f 100644 --- a/engine/clients/elasticsearch/search.py +++ b/engine/clients/elasticsearch/search.py @@ -1,3 +1,4 @@ +import copy import multiprocessing as mp import uuid from typing import List, Tuple @@ -5,12 +6,7 @@ from elasticsearch import Elasticsearch from engine.base_client.search import BaseSearcher -from engine.clients.elasticsearch.config import ( - ELASTIC_INDEX, - ELASTIC_PASSWORD, - ELASTIC_PORT, - ELASTIC_USER, -) +from engine.clients.elasticsearch.config import ELASTIC_INDEX, get_es_client from engine.clients.elasticsearch.parser import ElasticConditionParser @@ -38,12 +34,10 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic }, **connection_params, } - cls.client: Elasticsearch = Elasticsearch( - f"http://{host}:{ELASTIC_PORT}", - basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD), - **init_params, - ) - cls.search_params = search_params + cls.client = get_es_client(host, connection_params) + cls.search_params = copy.deepcopy(search_params) + # pop parallel + cls.search_params.pop("parallel", "1") @classmethod def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]: diff --git a/engine/clients/elasticsearch/upload.py b/engine/clients/elasticsearch/upload.py index 0d5c6f2b..7c3174ba 100644 --- a/engine/clients/elasticsearch/upload.py +++ b/engine/clients/elasticsearch/upload.py @@ -2,14 +2,14 @@ import uuid from typing import List, Optional -from elasticsearch import Elasticsearch +import elastic_transport +from elasticsearch import ApiError, Elasticsearch from engine.base_client.upload import BaseUploader from engine.clients.elasticsearch.config import ( ELASTIC_INDEX, - ELASTIC_PASSWORD, - ELASTIC_PORT, - ELASTIC_USER, + _wait_for_es_status, + get_es_client, ) @@ -28,19 +28,7 @@ def get_mp_start_method(cls): @classmethod def init_client(cls, host, distance, connection_params, upload_params): - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - cls.client = Elasticsearch( - f"http://{host}:{ELASTIC_PORT}", - basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD), - **init_params, - ) + cls.client = get_es_client(host, connection_params) cls.upload_params = upload_params @classmethod @@ -65,7 +53,23 @@ def upload_batch( @classmethod def post_upload(cls, _distance): - cls.client.indices.forcemerge( - index=ELASTIC_INDEX, wait_for_completion=True, max_num_segments=1 - ) + print("forcing the merge into 1 segment...") + tries = 30 + for i in range(tries + 1): + try: + cls.client.indices.forcemerge( + index=ELASTIC_INDEX, wait_for_completion=True, max_num_segments=1 + ) + except (elastic_transport.TlsError, ApiError) as e: + if i < tries: + print( + "Received the following error during retry {}/{} while waiting for ES index to be ready... {}".format( + i, tries, e.__str__() + ) + ) + continue + else: + raise + _wait_for_es_status(cls.client) + break return {}