From 0505073ad88f0e490b19b75cdfd516e21fb5d311 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Tue, 7 Oct 2025 23:37:59 +0530 Subject: [PATCH 1/4] fix: Fix hybrid search bugs --- langchain_postgres/v2/async_vectorstore.py | 203 ++++++++++----------- 1 file changed, 100 insertions(+), 103 deletions(-) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index ddb0e94..782569e 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -567,112 +567,109 @@ async def afrom_documents( # type: ignore[override] return vs async def __query_collection( - self, - embedding: list[float], - *, - k: Optional[int] = None, - filter: Optional[dict] = None, - **kwargs: Any, - ) -> Sequence[RowMapping]: - """ - Perform similarity search (or hybrid search) query on database. - Queries might be slow if the hybrid search column does not exist. - For best hybrid search performance, consider creating a TSV column - and adding GIN index. - """ - if not k: - k = ( - max( - self.k, - self.hybrid_search_config.primary_top_k, - self.hybrid_search_config.secondary_top_k, - ) - if self.hybrid_search_config - else self.k - ) - operator = self.distance_strategy.operator - search_function = self.distance_strategy.search_function - - columns = [ - self.id_column, - self.content_column, - self.embedding_column, - ] + self.metadata_columns - if self.metadata_json_column: - columns.append(self.metadata_json_column) - - column_names = ", ".join(f'"{col}"' for col in columns) - - safe_filter = None - filter_dict = None - if filter and isinstance(filter, dict): - safe_filter, filter_dict = self._create_filter_clause(filter) - - inline_embed_func = getattr(self.embedding_service, "embed_query_inline", None) - if not embedding and callable(inline_embed_func) and "query" in kwargs: - query_embedding = self.embedding_service.embed_query_inline(kwargs["query"]) # type: ignore - embedding_data_string = f"{query_embedding}" - else: - query_embedding = f"{[float(dimension) for dimension in embedding]}" - embedding_data_string = ":query_embedding" - where_filters = f"WHERE {safe_filter}" if safe_filter else "" - dense_query_stmt = f"""SELECT {column_names}, {search_function}("{self.embedding_column}", {embedding_data_string}) as distance - FROM "{self.schema_name}"."{self.table_name}" {where_filters} ORDER BY "{self.embedding_column}" {operator} {embedding_data_string} LIMIT :k; - """ - param_dict = {"query_embedding": query_embedding, "k": k} - if filter_dict: - param_dict.update(filter_dict) - if self.index_query_options: - async with self.engine.connect() as conn: - # Set each query option individually - for query_option in self.index_query_options.to_parameter(): - query_options_stmt = f"SET LOCAL {query_option};" - await conn.execute(text(query_options_stmt)) - result = await conn.execute(text(dense_query_stmt), param_dict) - result_map = result.mappings() - dense_results = result_map.fetchall() + self, + embedding: list[float], + *, + k: Optional[int] = None, + filter: Optional[dict] = None, + **kwargs: Any, +) -> Sequence[RowMapping]: + """ + Perform similarity search (or hybrid search) query on database. + Queries might be slow if the hybrid search column does not exist. + For best hybrid search performance, consider creating a TSV column + and adding GIN index. + """ + hybrid_search_config = kwargs.get( + "hybrid_search_config", self.hybrid_search_config + ) + + final_k = k if k is not None else self.k + + dense_limit = final_k + if hybrid_search_config: + dense_limit = hybrid_search_config.primary_top_k + + operator = self.distance_strategy.operator + search_function = self.distance_strategy.search_function + + columns = [ + self.id_column, + self.content_column, + self.embedding_column, + ] + self.metadata_columns + if self.metadata_json_column: + columns.append(self.metadata_json_column) + + column_names = ", ".join(f'"{col}"' for col in columns) + + safe_filter = None + filter_dict = None + if filter and isinstance(filter, dict): + safe_filter, filter_dict = self._create_filter_clause(filter) + + inline_embed_func = getattr(self.embedding_service, "embed_query_inline", None) + if not embedding and callable(inline_embed_func) and "query" in kwargs: + query_embedding = self.embedding_service.embed_query_inline(kwargs["query"]) # type: ignore + embedding_data_string = f"{query_embedding}" + else: + query_embedding = f"{[float(dimension) for dimension in embedding]}" + embedding_data_string = ":query_embedding" + where_filters = f"WHERE {safe_filter}" if safe_filter else "" + dense_query_stmt = f"""SELECT {column_names}, {search_function}("{self.embedding_column}", {embedding_data_string}) as distance + FROM "{self.schema_name}"."{self.table_name}" {where_filters} ORDER BY "{self.embedding_column}" {operator} {embedding_data_string} LIMIT :dense_limit; + """ + param_dict = {"query_embedding": query_embedding, "dense_limit": dense_limit} + if filter_dict: + param_dict.update(filter_dict) + if self.index_query_options: + async with self.engine.connect() as conn: + # Set each query option individually + for query_option in self.index_query_options.to_parameter(): + query_options_stmt = f"SET LOCAL {query_option};" + await conn.execute(text(query_options_stmt)) + result = await conn.execute(text(dense_query_stmt), param_dict) + result_map = result.mappings() + dense_results = result_map.fetchall() + else: + async with self.engine.connect() as conn: + result = await conn.execute(text(dense_query_stmt), param_dict) + result_map = result.mappings() + dense_results = result_map.fetchall() + + fts_query = ( + hybrid_search_config.fts_query + if hybrid_search_config and hybrid_search_config.fts_query + else kwargs.get("fts_query", "") + ) + if hybrid_search_config and fts_query: + hybrid_search_config.fusion_function_parameters["fetch_top_k"] = final_k + # do the sparse query + lang = ( + f"'{hybrid_search_config.tsv_lang}'," + if hybrid_search_config.tsv_lang + else "" + ) + query_tsv = f"plainto_tsquery({lang} :fts_query)" + param_dict["fts_query"] = fts_query + if hybrid_search_config.tsv_column: + content_tsv = f'"{hybrid_search_config.tsv_column}"' else: - async with self.engine.connect() as conn: - result = await conn.execute(text(dense_query_stmt), param_dict) - result_map = result.mappings() - dense_results = result_map.fetchall() + content_tsv = f'to_tsvector({lang} "{self.content_column}")' + and_filters = f"AND ({safe_filter})" if safe_filter else "" + sparse_query_stmt = f'SELECT {column_names}, ts_rank_cd({content_tsv}, {query_tsv}) as distance FROM "{self.schema_name}"."{self.table_name}" WHERE {content_tsv} @@ {query_tsv} {and_filters} ORDER BY distance desc LIMIT {hybrid_search_config.secondary_top_k};' + async with self.engine.connect() as conn: + result = await conn.execute(text(sparse_query_stmt), param_dict) + result_map = result.mappings() + sparse_results = result_map.fetchall() - hybrid_search_config = kwargs.get( - "hybrid_search_config", self.hybrid_search_config - ) - fts_query = ( - hybrid_search_config.fts_query - if hybrid_search_config and hybrid_search_config.fts_query - else kwargs.get("fts_query", "") + combined_results = hybrid_search_config.fusion_function( + dense_results, + sparse_results, + **hybrid_search_config.fusion_function_parameters, ) - if hybrid_search_config and fts_query: - hybrid_search_config.fusion_function_parameters["fetch_top_k"] = k - # do the sparse query - lang = ( - f"'{hybrid_search_config.tsv_lang}'," - if hybrid_search_config.tsv_lang - else "" - ) - query_tsv = f"plainto_tsquery({lang} :fts_query)" - param_dict["fts_query"] = fts_query - if hybrid_search_config.tsv_column: - content_tsv = f'"{hybrid_search_config.tsv_column}"' - else: - content_tsv = f'to_tsvector({lang} "{self.content_column}")' - and_filters = f"AND ({safe_filter})" if safe_filter else "" - sparse_query_stmt = f'SELECT {column_names}, ts_rank_cd({content_tsv}, {query_tsv}) as distance FROM "{self.schema_name}"."{self.table_name}" WHERE {content_tsv} @@ {query_tsv} {and_filters} ORDER BY distance desc LIMIT {hybrid_search_config.secondary_top_k};' - async with self.engine.connect() as conn: - result = await conn.execute(text(sparse_query_stmt), param_dict) - result_map = result.mappings() - sparse_results = result_map.fetchall() - - combined_results = hybrid_search_config.fusion_function( - dense_results, - sparse_results, - **hybrid_search_config.fusion_function_parameters, - ) - return combined_results - return dense_results + return combined_results + return dense_results async def asimilarity_search( self, From 71cf9ffb115d3325eecd09d3307ffbcea0e192f0 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Tue, 7 Oct 2025 23:39:16 +0530 Subject: [PATCH 2/4] Update async_vectorstore.py --- langchain_postgres/v2/async_vectorstore.py | 200 ++++++++++----------- 1 file changed, 100 insertions(+), 100 deletions(-) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index 782569e..36aecc7 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -567,109 +567,109 @@ async def afrom_documents( # type: ignore[override] return vs async def __query_collection( - self, - embedding: list[float], - *, - k: Optional[int] = None, - filter: Optional[dict] = None, - **kwargs: Any, -) -> Sequence[RowMapping]: - """ - Perform similarity search (or hybrid search) query on database. - Queries might be slow if the hybrid search column does not exist. - For best hybrid search performance, consider creating a TSV column - and adding GIN index. - """ - hybrid_search_config = kwargs.get( - "hybrid_search_config", self.hybrid_search_config - ) - - final_k = k if k is not None else self.k - - dense_limit = final_k - if hybrid_search_config: - dense_limit = hybrid_search_config.primary_top_k - - operator = self.distance_strategy.operator - search_function = self.distance_strategy.search_function - - columns = [ - self.id_column, - self.content_column, - self.embedding_column, - ] + self.metadata_columns - if self.metadata_json_column: - columns.append(self.metadata_json_column) - - column_names = ", ".join(f'"{col}"' for col in columns) - - safe_filter = None - filter_dict = None - if filter and isinstance(filter, dict): - safe_filter, filter_dict = self._create_filter_clause(filter) - - inline_embed_func = getattr(self.embedding_service, "embed_query_inline", None) - if not embedding and callable(inline_embed_func) and "query" in kwargs: - query_embedding = self.embedding_service.embed_query_inline(kwargs["query"]) # type: ignore - embedding_data_string = f"{query_embedding}" - else: - query_embedding = f"{[float(dimension) for dimension in embedding]}" - embedding_data_string = ":query_embedding" - where_filters = f"WHERE {safe_filter}" if safe_filter else "" - dense_query_stmt = f"""SELECT {column_names}, {search_function}("{self.embedding_column}", {embedding_data_string}) as distance - FROM "{self.schema_name}"."{self.table_name}" {where_filters} ORDER BY "{self.embedding_column}" {operator} {embedding_data_string} LIMIT :dense_limit; - """ - param_dict = {"query_embedding": query_embedding, "dense_limit": dense_limit} - if filter_dict: - param_dict.update(filter_dict) - if self.index_query_options: - async with self.engine.connect() as conn: - # Set each query option individually - for query_option in self.index_query_options.to_parameter(): - query_options_stmt = f"SET LOCAL {query_option};" - await conn.execute(text(query_options_stmt)) - result = await conn.execute(text(dense_query_stmt), param_dict) - result_map = result.mappings() - dense_results = result_map.fetchall() - else: - async with self.engine.connect() as conn: - result = await conn.execute(text(dense_query_stmt), param_dict) - result_map = result.mappings() - dense_results = result_map.fetchall() - - fts_query = ( - hybrid_search_config.fts_query - if hybrid_search_config and hybrid_search_config.fts_query - else kwargs.get("fts_query", "") - ) - if hybrid_search_config and fts_query: - hybrid_search_config.fusion_function_parameters["fetch_top_k"] = final_k - # do the sparse query - lang = ( - f"'{hybrid_search_config.tsv_lang}'," - if hybrid_search_config.tsv_lang - else "" + self, + embedding: list[float], + *, + k: Optional[int] = None, + filter: Optional[dict] = None, + **kwargs: Any, + ) -> Sequence[RowMapping]: + """ + Perform similarity search (or hybrid search) query on database. + Queries might be slow if the hybrid search column does not exist. + For best hybrid search performance, consider creating a TSV column + and adding GIN index. + """ + hybrid_search_config = kwargs.get( + "hybrid_search_config", self.hybrid_search_config ) - query_tsv = f"plainto_tsquery({lang} :fts_query)" - param_dict["fts_query"] = fts_query - if hybrid_search_config.tsv_column: - content_tsv = f'"{hybrid_search_config.tsv_column}"' + + final_k = k if k is not None else self.k + + dense_limit = final_k + if hybrid_search_config: + dense_limit = hybrid_search_config.primary_top_k + + operator = self.distance_strategy.operator + search_function = self.distance_strategy.search_function + + columns = [ + self.id_column, + self.content_column, + self.embedding_column, + ] + self.metadata_columns + if self.metadata_json_column: + columns.append(self.metadata_json_column) + + column_names = ", ".join(f'"{col}"' for col in columns) + + safe_filter = None + filter_dict = None + if filter and isinstance(filter, dict): + safe_filter, filter_dict = self._create_filter_clause(filter) + + inline_embed_func = getattr(self.embedding_service, "embed_query_inline", None) + if not embedding and callable(inline_embed_func) and "query" in kwargs: + query_embedding = self.embedding_service.embed_query_inline(kwargs["query"]) # type: ignore + embedding_data_string = f"{query_embedding}" else: - content_tsv = f'to_tsvector({lang} "{self.content_column}")' - and_filters = f"AND ({safe_filter})" if safe_filter else "" - sparse_query_stmt = f'SELECT {column_names}, ts_rank_cd({content_tsv}, {query_tsv}) as distance FROM "{self.schema_name}"."{self.table_name}" WHERE {content_tsv} @@ {query_tsv} {and_filters} ORDER BY distance desc LIMIT {hybrid_search_config.secondary_top_k};' - async with self.engine.connect() as conn: - result = await conn.execute(text(sparse_query_stmt), param_dict) - result_map = result.mappings() - sparse_results = result_map.fetchall() - - combined_results = hybrid_search_config.fusion_function( - dense_results, - sparse_results, - **hybrid_search_config.fusion_function_parameters, + query_embedding = f"{[float(dimension) for dimension in embedding]}" + embedding_data_string = ":query_embedding" + where_filters = f"WHERE {safe_filter}" if safe_filter else "" + dense_query_stmt = f"""SELECT {column_names}, {search_function}("{self.embedding_column}", {embedding_data_string}) as distance + FROM "{self.schema_name}"."{self.table_name}" {where_filters} ORDER BY "{self.embedding_column}" {operator} {embedding_data_string} LIMIT :dense_limit; + """ + param_dict = {"query_embedding": query_embedding, "dense_limit": dense_limit} + if filter_dict: + param_dict.update(filter_dict) + if self.index_query_options: + async with self.engine.connect() as conn: + # Set each query option individually + for query_option in self.index_query_options.to_parameter(): + query_options_stmt = f"SET LOCAL {query_option};" + await conn.execute(text(query_options_stmt)) + result = await conn.execute(text(dense_query_stmt), param_dict) + result_map = result.mappings() + dense_results = result_map.fetchall() + else: + async with self.engine.connect() as conn: + result = await conn.execute(text(dense_query_stmt), param_dict) + result_map = result.mappings() + dense_results = result_map.fetchall() + + fts_query = ( + hybrid_search_config.fts_query + if hybrid_search_config and hybrid_search_config.fts_query + else kwargs.get("fts_query", "") ) - return combined_results - return dense_results + if hybrid_search_config and fts_query: + hybrid_search_config.fusion_function_parameters["fetch_top_k"] = final_k + # do the sparse query + lang = ( + f"'{hybrid_search_config.tsv_lang}'," + if hybrid_search_config.tsv_lang + else "" + ) + query_tsv = f"plainto_tsquery({lang} :fts_query)" + param_dict["fts_query"] = fts_query + if hybrid_search_config.tsv_column: + content_tsv = f'"{hybrid_search_config.tsv_column}"' + else: + content_tsv = f'to_tsvector({lang} "{self.content_column}")' + and_filters = f"AND ({safe_filter})" if safe_filter else "" + sparse_query_stmt = f'SELECT {column_names}, ts_rank_cd({content_tsv}, {query_tsv}) as distance FROM "{self.schema_name}"."{self.table_name}" WHERE {content_tsv} @@ {query_tsv} {and_filters} ORDER BY distance desc LIMIT {hybrid_search_config.secondary_top_k};' + async with self.engine.connect() as conn: + result = await conn.execute(text(sparse_query_stmt), param_dict) + result_map = result.mappings() + sparse_results = result_map.fetchall() + + combined_results = hybrid_search_config.fusion_function( + dense_results, + sparse_results, + **hybrid_search_config.fusion_function_parameters, + ) + return combined_results + return dense_results async def asimilarity_search( self, From 7c108a0e0c6a092b4f72ff620bd4001524606509 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Tue, 7 Oct 2025 23:42:05 +0530 Subject: [PATCH 3/4] Update async_vectorstore.py --- langchain_postgres/v2/async_vectorstore.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index 36aecc7..bfef29b 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -592,7 +592,7 @@ async def __query_collection( operator = self.distance_strategy.operator search_function = self.distance_strategy.search_function - + columns = [ self.id_column, self.content_column, @@ -600,14 +600,14 @@ async def __query_collection( ] + self.metadata_columns if self.metadata_json_column: columns.append(self.metadata_json_column) - + column_names = ", ".join(f'"{col}"' for col in columns) - + safe_filter = None filter_dict = None if filter and isinstance(filter, dict): safe_filter, filter_dict = self._create_filter_clause(filter) - + inline_embed_func = getattr(self.embedding_service, "embed_query_inline", None) if not embedding and callable(inline_embed_func) and "query" in kwargs: query_embedding = self.embedding_service.embed_query_inline(kwargs["query"]) # type: ignore @@ -662,7 +662,7 @@ async def __query_collection( result = await conn.execute(text(sparse_query_stmt), param_dict) result_map = result.mappings() sparse_results = result_map.fetchall() - + combined_results = hybrid_search_config.fusion_function( dense_results, sparse_results, From c8c9727e29384837213a62caaadbbc7d24fed360 Mon Sep 17 00:00:00 2001 From: dishaprakash <57954147+dishaprakash@users.noreply.github.com> Date: Thu, 9 Oct 2025 18:13:54 +0530 Subject: [PATCH 4/4] Update async_vectorstore.py --- langchain_postgres/v2/async_vectorstore.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/langchain_postgres/v2/async_vectorstore.py b/langchain_postgres/v2/async_vectorstore.py index 2d58a1c..c83930e 100644 --- a/langchain_postgres/v2/async_vectorstore.py +++ b/langchain_postgres/v2/async_vectorstore.py @@ -583,13 +583,13 @@ async def __query_collection( hybrid_search_config = kwargs.get( "hybrid_search_config", self.hybrid_search_config ) - + final_k = k if k is not None else self.k - + dense_limit = final_k if hybrid_search_config: dense_limit = hybrid_search_config.primary_top_k - + operator = self.distance_strategy.operator search_function = self.distance_strategy.search_function @@ -636,7 +636,7 @@ async def __query_collection( result = await conn.execute(text(dense_query_stmt), param_dict) result_map = result.mappings() dense_results = result_map.fetchall() - + fts_query = ( hybrid_search_config.fts_query if hybrid_search_config and hybrid_search_config.fts_query