Skip to content
56 changes: 52 additions & 4 deletions engine/clients/elasticsearch/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,52 @@
ELASTIC_PORT = 9200
ELASTIC_INDEX = "bench"
ELASTIC_USER = "elastic"
ELASTIC_PASSWORD = "passwd"
import os
import time

import urllib3
from elasticsearch import Elasticsearch

ELASTIC_PORT = int(os.getenv("ELASTIC_PORT", 9200))
ELASTIC_INDEX = os.getenv("ELASTIC_INDEX", "bench")
ELASTIC_USER = os.getenv("ELASTIC_USER", "elastic")
ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD", "passwd")
ELASTIC_TIMEOUT = int(os.getenv("ELASTIC_TIMEOUT", 300))
ELASTIC_INDEX_TIMEOUT = os.getenv("ELASTIC_INDEX_TIMEOUT", "30m")
ELASTIC_INDEX_REFRESH_INTERVAL = os.getenv("ELASTIC_INDEX_REFRESH_INTERVAL", "-1")
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


def get_es_client(host, connection_params):
client: Elasticsearch = None
init_params = {
**{
"verify_certs": False,
"request_timeout": ELASTIC_TIMEOUT,
"retry_on_timeout": True,
"ssl_show_warn": False,
},
**connection_params,
}
if host.startswith("http"):
url = ""
else:
url = "http://"
url += f"{host}:{ELASTIC_PORT}"
client = Elasticsearch(
url,
basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
**init_params,
)
assert client.ping()
return client


def _wait_for_es_status(client, status="yellow"):
print(f"waiting for ES {status} status...")
for _ in range(100):
try:
client.cluster.health(wait_for_status=status)
return client
except ConnectionError:
time.sleep(0.1)
else:
# timeout
raise Exception("Elasticsearch failed to start.")
33 changes: 14 additions & 19 deletions engine/clients/elasticsearch/configure.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
from elasticsearch import Elasticsearch, NotFoundError
from elasticsearch import NotFoundError

from benchmark.dataset import Dataset
from engine.base_client import IncompatibilityError
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.elasticsearch.config import (
ELASTIC_INDEX,
ELASTIC_PASSWORD,
ELASTIC_PORT,
ELASTIC_USER,
ELASTIC_INDEX_REFRESH_INTERVAL,
ELASTIC_INDEX_TIMEOUT,
get_es_client,
)


Expand All @@ -25,27 +25,19 @@ class ElasticConfigurator(BaseConfigurator):

def __init__(self, host, collection_params: dict, connection_params: dict):
super().__init__(host, collection_params, connection_params)
init_params = {
**{
"verify_certs": False,
"request_timeout": 90,
"retry_on_timeout": True,
},
**connection_params,
}
self.client = Elasticsearch(
f"http://{host}:{ELASTIC_PORT}",
basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
**init_params,
)
self.client = get_es_client(host, connection_params)

def clean(self):
print("Ensuring the index does not exist...")
try:
self.client.indices.delete(
index=ELASTIC_INDEX, timeout="5m", master_timeout="5m"
index=ELASTIC_INDEX,
timeout=ELASTIC_INDEX_TIMEOUT,
master_timeout=ELASTIC_INDEX_TIMEOUT,
)
except NotFoundError:
pass
print("Finished ensuring the index does not exist...")

def recreate(self, dataset: Dataset, collection_params):
if dataset.config.distance == Distance.DOT:
Expand All @@ -56,11 +48,14 @@ def recreate(self, dataset: Dataset, collection_params):

self.client.indices.create(
index=ELASTIC_INDEX,
timeout=ELASTIC_INDEX_TIMEOUT,
master_timeout=ELASTIC_INDEX_TIMEOUT,
wait_for_active_shards="all",
settings={
"index": {
"number_of_shards": 1,
"number_of_replicas": 0,
"refresh_interval": -1,
"refresh_interval": ELASTIC_INDEX_REFRESH_INTERVAL,
}
},
mappings={
Expand Down
18 changes: 6 additions & 12 deletions engine/clients/elasticsearch/search.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import copy
import multiprocessing as mp
import uuid
from typing import List, Tuple

from elasticsearch import Elasticsearch

from engine.base_client.search import BaseSearcher
from engine.clients.elasticsearch.config import (
ELASTIC_INDEX,
ELASTIC_PASSWORD,
ELASTIC_PORT,
ELASTIC_USER,
)
from engine.clients.elasticsearch.config import ELASTIC_INDEX, get_es_client
from engine.clients.elasticsearch.parser import ElasticConditionParser


Expand Down Expand Up @@ -38,12 +34,10 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic
},
**connection_params,
}
cls.client: Elasticsearch = Elasticsearch(
f"http://{host}:{ELASTIC_PORT}",
basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
**init_params,
)
cls.search_params = search_params
cls.client = get_es_client(host, connection_params)
cls.search_params = copy.deepcopy(search_params)
# pop parallel
cls.search_params.pop("parallel", "1")

@classmethod
def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]:
Expand Down
44 changes: 24 additions & 20 deletions engine/clients/elasticsearch/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@
import uuid
from typing import List, Optional

from elasticsearch import Elasticsearch
import elastic_transport
from elasticsearch import ApiError, Elasticsearch

from engine.base_client.upload import BaseUploader
from engine.clients.elasticsearch.config import (
ELASTIC_INDEX,
ELASTIC_PASSWORD,
ELASTIC_PORT,
ELASTIC_USER,
_wait_for_es_status,
get_es_client,
)


Expand All @@ -28,19 +28,7 @@ def get_mp_start_method(cls):

@classmethod
def init_client(cls, host, distance, connection_params, upload_params):
init_params = {
**{
"verify_certs": False,
"request_timeout": 90,
"retry_on_timeout": True,
},
**connection_params,
}
cls.client = Elasticsearch(
f"http://{host}:{ELASTIC_PORT}",
basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
**init_params,
)
cls.client = get_es_client(host, connection_params)
cls.upload_params = upload_params

@classmethod
Expand All @@ -65,7 +53,23 @@ def upload_batch(

@classmethod
def post_upload(cls, _distance):
cls.client.indices.forcemerge(
index=ELASTIC_INDEX, wait_for_completion=True, max_num_segments=1
)
print("forcing the merge into 1 segment...")
tries = 30
for i in range(tries + 1):
try:
cls.client.indices.forcemerge(
index=ELASTIC_INDEX, wait_for_completion=True, max_num_segments=1
)
except (elastic_transport.TlsError, ApiError) as e:
if i < tries:
print(
"Received the following error during retry {}/{} while waiting for ES index to be ready... {}".format(
i, tries, e.__str__()
)
)
continue
else:
raise
_wait_for_es_status(cls.client)
break
return {}