From bf3f9ebd29152055b0e0a2e11df9c5a5df7b4d93 Mon Sep 17 00:00:00 2001 From: KShivendu Date: Thu, 11 Apr 2024 14:37:21 +0530 Subject: [PATCH 1/6] refactor: Nested search params in ES config Co-authored-by: filipe oliveira --- engine/clients/elasticsearch/config.py | 34 ++++++++++++++++--- engine/clients/elasticsearch/configure.py | 8 ++--- engine/clients/elasticsearch/search.py | 22 +++--------- engine/clients/elasticsearch/upload.py | 21 ++---------- .../elasticsearch-single-node.json | 28 +++++++-------- 5 files changed, 53 insertions(+), 60 deletions(-) diff --git a/engine/clients/elasticsearch/config.py b/engine/clients/elasticsearch/config.py index 19b59d74..4154bd91 100644 --- a/engine/clients/elasticsearch/config.py +++ b/engine/clients/elasticsearch/config.py @@ -1,4 +1,30 @@ -ELASTIC_PORT = 9200 -ELASTIC_INDEX = "bench" -ELASTIC_USER = "elastic" -ELASTIC_PASSWORD = "passwd" +import os + +from elasticsearch import Elasticsearch + +ELASTIC_PORT = int(os.getenv("ELASTIC_PORT", 9200)) +ELASTIC_INDEX = os.getenv("ELASTIC_INDEX", "bench") +ELASTIC_USER = os.getenv("ELASTIC_USER", "elastic") +ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD", "passwd") + +ELASTIC_TIMEOUT = int(os.getenv("ELASTIC_TIMEOUT", 300)) +ELASTIC_INDEX_TIMEOUT = os.getenv("ELASTIC_INDEX_TIMEOUT", "30m") +ELASTIC_INDEX_REFRESH_INTERVAL = os.getenv("ELASTIC_INDEX_REFRESH_INTERVAL", "-1") + + +def get_es_client(host, connection_params): + client: Elasticsearch = None + init_params = { + "verify_certs": False, + "request_timeout": ELASTIC_TIMEOUT, + "retry_on_timeout": True, + "ssl_show_warn": False, + **connection_params, + } + client = Elasticsearch( + f"http://{host}:{ELASTIC_PORT}", + basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD), + **init_params, + ) + assert client.ping() + return client diff --git a/engine/clients/elasticsearch/configure.py b/engine/clients/elasticsearch/configure.py index 76f64eb8..4bce1c66 100644 --- a/engine/clients/elasticsearch/configure.py +++ b/engine/clients/elasticsearch/configure.py @@ -72,12 +72,8 @@ def recreate(self, dataset: Dataset, collection_params): "index": True, "similarity": self.DISTANCE_MAPPING[dataset.config.distance], "index_options": { - **{ - "type": "hnsw", - "m": 16, - "ef_construction": 100, - }, - **collection_params.get("index_options"), + "type": "hnsw", + **collection_params["index_options"], }, }, **self._prepare_fields_config(dataset), diff --git a/engine/clients/elasticsearch/search.py b/engine/clients/elasticsearch/search.py index 29d20ec5..7a6600a8 100644 --- a/engine/clients/elasticsearch/search.py +++ b/engine/clients/elasticsearch/search.py @@ -7,10 +7,8 @@ from engine.base_client.search import BaseSearcher from engine.clients.elasticsearch.config import ( ELASTIC_INDEX, - ELASTIC_PASSWORD, - ELASTIC_PORT, - ELASTIC_USER, ) +from engine.clients.elasticsearch.config import get_es_client from engine.clients.elasticsearch.parser import ElasticConditionParser @@ -29,20 +27,8 @@ def get_mp_start_method(cls): return "forkserver" if "forkserver" in mp.get_all_start_methods() else "spawn" @classmethod - def init_client(cls, host, distance, connection_params: dict, search_params: dict): - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - cls.client: Elasticsearch = Elasticsearch( - f"http://{host}:{ELASTIC_PORT}", - basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD), - **init_params, - ) + def init_client(cls, host, _distance, connection_params: dict, search_params: dict): + cls.client = get_es_client(host, connection_params) cls.search_params = search_params @classmethod @@ -51,7 +37,7 @@ def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]: "field": "vector", "query_vector": vector, "k": top, - **{"num_candidates": 100, **cls.search_params}, + **cls.search_params["params"], } meta_conditions = cls.parser.parse(meta_conditions) diff --git a/engine/clients/elasticsearch/upload.py b/engine/clients/elasticsearch/upload.py index 0d5c6f2b..e748aeaf 100644 --- a/engine/clients/elasticsearch/upload.py +++ b/engine/clients/elasticsearch/upload.py @@ -5,14 +5,11 @@ from elasticsearch import Elasticsearch from engine.base_client.upload import BaseUploader +from engine.clients.elasticsearch.config import get_es_client from engine.clients.elasticsearch.config import ( ELASTIC_INDEX, - ELASTIC_PASSWORD, - ELASTIC_PORT, - ELASTIC_USER, ) - class ClosableElastic(Elasticsearch): def __del__(self): self.close() @@ -27,20 +24,8 @@ def get_mp_start_method(cls): return "forkserver" if "forkserver" in mp.get_all_start_methods() else "spawn" @classmethod - def init_client(cls, host, distance, connection_params, upload_params): - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - cls.client = Elasticsearch( - f"http://{host}:{ELASTIC_PORT}", - basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD), - **init_params, - ) + def init_client(cls, host, _distance, connection_params, upload_params): + cls.client = get_es_client(host, connection_params) cls.upload_params = upload_params @classmethod diff --git a/experiments/configurations/elasticsearch-single-node.json b/experiments/configurations/elasticsearch-single-node.json index b3f0f609..3ffb4034 100644 --- a/experiments/configurations/elasticsearch-single-node.json +++ b/experiments/configurations/elasticsearch-single-node.json @@ -7,8 +7,8 @@ }, "collection_params": { "index_options": { "m": 16, "ef_construction": 100 } }, "search_params": [ - { "parallel": 1, "num_candidates": 128 }, { "parallel": 1, "num_candidates": 256 }, { "parallel": 1, "num_candidates": 512 }, - { "parallel": 100, "num_candidates": 128 }, { "parallel": 100, "num_candidates": 256 }, { "parallel": 100, "num_candidates": 512 } + { "parallel": 1, "params": {"num_candidates": 128} }, { "parallel": 1, "params": {"num_candidates": 256} }, { "parallel": 1, "params" :{"num_candidates": 512} }, + { "parallel": 100, "params": {"num_candidates": 128} }, { "parallel": 100, "params": {"num_candidates": 256} }, { "parallel": 100, "params" :{"num_candidates": 512} } ], "upload_params": { "parallel": 16 } }, @@ -20,8 +20,8 @@ }, "collection_params": { "index_options": { "m": 16, "ef_construction": 128 } }, "search_params": [ - { "parallel": 1, "num_candidates": 128 }, { "parallel": 1, "num_candidates": 256 }, { "parallel": 1, "num_candidates": 512 }, - { "parallel": 100, "num_candidates": 128 }, { "parallel": 100, "num_candidates": 256 }, { "parallel": 100, "num_candidates": 512 } + { "parallel": 1, "params": {"num_candidates": 128} }, { "parallel": 1, "params": {"num_candidates": 256} }, { "parallel": 1, "params" :{"num_candidates": 512} }, + { "parallel": 100, "params": {"num_candidates": 128} }, { "parallel": 100, "params": {"num_candidates": 256} }, { "parallel": 100, "params" :{"num_candidates": 512} } ], "upload_params": { "parallel": 16 } }, @@ -33,8 +33,8 @@ }, "collection_params": { "index_options": { "m": 32, "ef_construction": 128 } }, "search_params": [ - { "parallel": 1, "num_candidates": 128 }, { "parallel": 1, "num_candidates": 256 }, { "parallel": 1, "num_candidates": 512 }, - { "parallel": 100, "num_candidates": 128 }, { "parallel": 100, "num_candidates": 256 }, { "parallel": 100, "num_candidates": 512 } + { "parallel": 1, "params": {"num_candidates": 128} }, { "parallel": 1, "params": {"num_candidates": 256} }, { "parallel": 1, "params" :{"num_candidates": 512} }, + { "parallel": 100, "params": {"num_candidates": 128} }, { "parallel": 100, "params": {"num_candidates": 256} }, { "parallel": 100, "params" :{"num_candidates": 512} } ], "upload_params": { "parallel": 16 } }, @@ -46,8 +46,8 @@ }, "collection_params": { "index_options": { "m": 32, "ef_construction": 256 } }, "search_params": [ - { "parallel": 1, "num_candidates": 128 }, { "parallel": 1, "num_candidates": 256 }, { "parallel": 1, "num_candidates": 512 }, - { "parallel": 100, "num_candidates": 128 }, { "parallel": 100, "num_candidates": 256 }, { "parallel": 100, "num_candidates": 512 } + { "parallel": 1, "params": {"num_candidates": 128} }, { "parallel": 1, "params": {"num_candidates": 256} }, { "parallel": 1, "params" :{"num_candidates": 512} }, + { "parallel": 100, "params": {"num_candidates": 128} }, { "parallel": 100, "params": {"num_candidates": 256} }, { "parallel": 100, "params" :{"num_candidates": 512} } ], "upload_params": { "parallel": 16 } }, @@ -59,8 +59,8 @@ }, "collection_params": { "index_options": { "m": 32, "ef_construction": 512 } }, "search_params": [ - { "parallel": 1, "num_candidates": 128 }, { "parallel": 1, "num_candidates": 256 }, { "parallel": 1, "num_candidates": 512 }, - { "parallel": 100, "num_candidates": 128 }, { "parallel": 100, "num_candidates": 256 }, { "parallel": 100, "num_candidates": 512 } + { "parallel": 1, "params": {"num_candidates": 128} }, { "parallel": 1, "params": {"num_candidates": 256} }, { "parallel": 1, "params" :{"num_candidates": 512} }, + { "parallel": 100, "params": {"num_candidates": 128} }, { "parallel": 100, "params": {"num_candidates": 256} }, { "parallel": 100, "params" :{"num_candidates": 512} } ], "upload_params": { "parallel": 16 } }, @@ -72,8 +72,8 @@ }, "collection_params": { "index_options": { "m": 64, "ef_construction": 256 } }, "search_params": [ - { "parallel": 1, "num_candidates": 128 }, { "parallel": 1, "num_candidates": 256 }, { "parallel": 1, "num_candidates": 512 }, - { "parallel": 100, "num_candidates": 128 }, { "parallel": 100, "num_candidates": 256 }, { "parallel": 100, "num_candidates": 512 } + { "parallel": 1, "params": {"num_candidates": 128} }, { "parallel": 1, "params": {"num_candidates": 256} }, { "parallel": 1, "params" :{"num_candidates": 512} }, + { "parallel": 100, "params": {"num_candidates": 128} }, { "parallel": 100, "params": {"num_candidates": 256} }, { "parallel": 100, "params" :{"num_candidates": 512} } ], "upload_params": { "parallel": 16 } }, @@ -85,8 +85,8 @@ }, "collection_params": { "index_options": { "m": 64, "ef_construction": 512 } }, "search_params": [ - { "parallel": 1, "num_candidates": 128 }, { "parallel": 1, "num_candidates": 256 }, { "parallel": 1, "num_candidates": 512 }, - { "parallel": 100, "num_candidates": 128 }, { "parallel": 100, "num_candidates": 256 }, { "parallel": 100, "num_candidates": 512 } + { "parallel": 1, "params": {"num_candidates": 128} }, { "parallel": 1, "params": {"num_candidates": 256} }, { "parallel": 1, "params" :{"num_candidates": 512} }, + { "parallel": 100, "params": {"num_candidates": 128} }, { "parallel": 100, "params": {"num_candidates": 256} }, { "parallel": 100, "params" :{"num_candidates": 512} } ], "upload_params": { "parallel": 16 } } From 2d40ab19675c1f0b093b8fed36c8c1e8a4e420d7 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 11 Apr 2024 09:12:17 +0000 Subject: [PATCH 2/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- engine/clients/elasticsearch/search.py | 5 +---- engine/clients/elasticsearch/upload.py | 6 ++---- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/engine/clients/elasticsearch/search.py b/engine/clients/elasticsearch/search.py index 7a6600a8..254479ee 100644 --- a/engine/clients/elasticsearch/search.py +++ b/engine/clients/elasticsearch/search.py @@ -5,10 +5,7 @@ from elasticsearch import Elasticsearch from engine.base_client.search import BaseSearcher -from engine.clients.elasticsearch.config import ( - ELASTIC_INDEX, -) -from engine.clients.elasticsearch.config import get_es_client +from engine.clients.elasticsearch.config import ELASTIC_INDEX, get_es_client from engine.clients.elasticsearch.parser import ElasticConditionParser diff --git a/engine/clients/elasticsearch/upload.py b/engine/clients/elasticsearch/upload.py index e748aeaf..96a18d22 100644 --- a/engine/clients/elasticsearch/upload.py +++ b/engine/clients/elasticsearch/upload.py @@ -5,10 +5,8 @@ from elasticsearch import Elasticsearch from engine.base_client.upload import BaseUploader -from engine.clients.elasticsearch.config import get_es_client -from engine.clients.elasticsearch.config import ( - ELASTIC_INDEX, -) +from engine.clients.elasticsearch.config import ELASTIC_INDEX, get_es_client + class ClosableElastic(Elasticsearch): def __del__(self): From 31a867c9f1b245e29d5056568a69242183b4f29f Mon Sep 17 00:00:00 2001 From: KShivendu Date: Fri, 12 Apr 2024 12:36:48 +0530 Subject: [PATCH 3/6] fix: Remove extra config vars --- engine/clients/elasticsearch/config.py | 6 ------ engine/clients/elasticsearch/configure.py | 21 ++++----------------- 2 files changed, 4 insertions(+), 23 deletions(-) diff --git a/engine/clients/elasticsearch/config.py b/engine/clients/elasticsearch/config.py index 4154bd91..2ecd5ee9 100644 --- a/engine/clients/elasticsearch/config.py +++ b/engine/clients/elasticsearch/config.py @@ -7,16 +7,10 @@ ELASTIC_USER = os.getenv("ELASTIC_USER", "elastic") ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD", "passwd") -ELASTIC_TIMEOUT = int(os.getenv("ELASTIC_TIMEOUT", 300)) -ELASTIC_INDEX_TIMEOUT = os.getenv("ELASTIC_INDEX_TIMEOUT", "30m") -ELASTIC_INDEX_REFRESH_INTERVAL = os.getenv("ELASTIC_INDEX_REFRESH_INTERVAL", "-1") - def get_es_client(host, connection_params): - client: Elasticsearch = None init_params = { "verify_certs": False, - "request_timeout": ELASTIC_TIMEOUT, "retry_on_timeout": True, "ssl_show_warn": False, **connection_params, diff --git a/engine/clients/elasticsearch/configure.py b/engine/clients/elasticsearch/configure.py index 4bce1c66..4e9ae04c 100644 --- a/engine/clients/elasticsearch/configure.py +++ b/engine/clients/elasticsearch/configure.py @@ -6,9 +6,7 @@ from engine.base_client.distances import Distance from engine.clients.elasticsearch.config import ( ELASTIC_INDEX, - ELASTIC_PASSWORD, - ELASTIC_PORT, - ELASTIC_USER, + get_es_client ) @@ -18,6 +16,7 @@ class ElasticConfigurator(BaseConfigurator): Distance.COSINE: "cosine", Distance.DOT: "dot_product", } + # TODO: Add other types INDEX_TYPE_MAPPING = { "int": "long", "geo": "geo_point", @@ -25,19 +24,7 @@ class ElasticConfigurator(BaseConfigurator): def __init__(self, host, collection_params: dict, connection_params: dict): super().__init__(host, collection_params, connection_params) - init_params = { - **{ - "verify_certs": False, - "request_timeout": 90, - "retry_on_timeout": True, - }, - **connection_params, - } - self.client = Elasticsearch( - f"http://{host}:{ELASTIC_PORT}", - basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD), - **init_params, - ) + self.client = get_es_client(host, connection_params) def clean(self): try: @@ -60,7 +47,7 @@ def recreate(self, dataset: Dataset, collection_params): "index": { "number_of_shards": 1, "number_of_replicas": 0, - "refresh_interval": -1, + "refresh_interval": -1, # no refresh is required because we index all the data at once } }, mappings={ From a7fe285b694b1a3331f896d6a6745eef20d69065 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 12 Apr 2024 07:10:57 +0000 Subject: [PATCH 4/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- engine/clients/elasticsearch/configure.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/engine/clients/elasticsearch/configure.py b/engine/clients/elasticsearch/configure.py index 4e9ae04c..d6c87ab1 100644 --- a/engine/clients/elasticsearch/configure.py +++ b/engine/clients/elasticsearch/configure.py @@ -4,10 +4,7 @@ from engine.base_client import IncompatibilityError from engine.base_client.configure import BaseConfigurator from engine.base_client.distances import Distance -from engine.clients.elasticsearch.config import ( - ELASTIC_INDEX, - get_es_client -) +from engine.clients.elasticsearch.config import ELASTIC_INDEX, get_es_client class ElasticConfigurator(BaseConfigurator): @@ -47,7 +44,7 @@ def recreate(self, dataset: Dataset, collection_params): "index": { "number_of_shards": 1, "number_of_replicas": 0, - "refresh_interval": -1, # no refresh is required because we index all the data at once + "refresh_interval": -1, # no refresh is required because we index all the data at once } }, mappings={ From 5a205584b83c542869ad8a668f51618a40c2d6da Mon Sep 17 00:00:00 2001 From: Kumar Shivendu Date: Mon, 15 Apr 2024 21:00:22 +0530 Subject: [PATCH 5/6] fix: Remove extra comment --- engine/clients/elasticsearch/configure.py | 1 - 1 file changed, 1 deletion(-) diff --git a/engine/clients/elasticsearch/configure.py b/engine/clients/elasticsearch/configure.py index d6c87ab1..6ce62494 100644 --- a/engine/clients/elasticsearch/configure.py +++ b/engine/clients/elasticsearch/configure.py @@ -13,7 +13,6 @@ class ElasticConfigurator(BaseConfigurator): Distance.COSINE: "cosine", Distance.DOT: "dot_product", } - # TODO: Add other types INDEX_TYPE_MAPPING = { "int": "long", "geo": "geo_point", From c527ae38890b1291924e50063ae8dc25188b9560 Mon Sep 17 00:00:00 2001 From: KShivendu Date: Mon, 15 Apr 2024 22:13:39 +0530 Subject: [PATCH 6/6] feat: Add keyword, text, and float index types in ES --- engine/clients/elasticsearch/configure.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/engine/clients/elasticsearch/configure.py b/engine/clients/elasticsearch/configure.py index 6ce62494..d46166a7 100644 --- a/engine/clients/elasticsearch/configure.py +++ b/engine/clients/elasticsearch/configure.py @@ -15,6 +15,9 @@ class ElasticConfigurator(BaseConfigurator): } INDEX_TYPE_MAPPING = { "int": "long", + "keyword": "keyword", + "text": "text", + "float": "double", "geo": "geo_point", }