Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
22088a1
feat: Added Hybrid Search Config and Tests [1/N]
vishwarajanand May 19, 2025
30942ff
feat: create hybrid search capable vector store table [2/N]
vishwarajanand May 19, 2025
e641575
feat: adds hybrid search for async VS interface [3/N]
vishwarajanand May 19, 2025
2a0bf0d
feat: adds hybrid search for sync VS interface [4/N]
vishwarajanand May 19, 2025
0562678
Merge branch 'main' into hybrid_search_1
vishwarajanand May 30, 2025
70ee300
fix: tests
vishwarajanand May 30, 2025
5234648
fix: pr comments
vishwarajanand May 30, 2025
73d4400
fix: lint
vishwarajanand May 30, 2025
57ceb2c
fix: lint
vishwarajanand May 30, 2025
678e7b1
Merge branch 'hybrid_search_1' into hybrid_search_2
vishwarajanand May 30, 2025
7feb7a0
Merge branch 'hybrid_search_2' into hybrid_search_3
vishwarajanand May 30, 2025
ef349a3
pr comment: add disclaimer on slow query on config docstring
vishwarajanand May 30, 2025
ceabf10
pr comment: add disclaimer in engine table create
vishwarajanand May 30, 2025
9611164
Merge branch 'hybrid_search_1' into hybrid_search_2
vishwarajanand May 30, 2025
8a39e61
feat: address pr comments
vishwarajanand Jun 2, 2025
e5bd215
Merge branch 'hybrid_search_2' into hybrid_search_3
vishwarajanand Jun 2, 2025
6854ee0
fix: tsv column name in tests
vishwarajanand Jun 2, 2025
5bf1a4b
fix: add if exists in drop to avoid failures
vishwarajanand Jun 2, 2025
4153c2d
Merge branch 'hybrid_search_3' into hybrid_search_4
vishwarajanand Jun 2, 2025
e092c82
fix: tests
vishwarajanand Jun 2, 2025
08a4ff6
feat: adds hybrid search for sync VS interface [4/N]
vishwarajanand Jun 3, 2025
076f0cb
feat: adds hybrid search for async VS interface [3/N]
vishwarajanand Jun 3, 2025
620e3e5
feat: create hybrid search capable vector store table [2/N]
vishwarajanand Jun 3, 2025
0d223fd
chore: fix lint
vishwarajanand Jun 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 152 additions & 13 deletions langchain_postgres/v2/async_vectorstore.py

Large diffs are not rendered by default.

34 changes: 32 additions & 2 deletions langchain_postgres/v2/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from sqlalchemy.engine import URL
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from .hybrid_search_config import HybridSearchConfig

T = TypeVar("T")


Expand Down Expand Up @@ -156,6 +158,7 @@ async def _ainit_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -178,6 +181,8 @@ async def _ainit_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Default: None.

Raises:
:class:`DuplicateTableError <asyncpg.exceptions.DuplicateTableError>`: if table already exists.
Expand All @@ -186,6 +191,7 @@ async def _ainit_vectorstore_table(

schema_name = self._escape_postgres_identifier(schema_name)
table_name = self._escape_postgres_identifier(table_name)
hybrid_search_default_column_name = content_column + "_tsv"
content_column = self._escape_postgres_identifier(content_column)
embedding_column = self._escape_postgres_identifier(embedding_column)
if metadata_columns is None:
Expand Down Expand Up @@ -226,10 +232,22 @@ async def _ainit_vectorstore_table(
id_data_type = id_column["data_type"]
id_column_name = id_column["name"]

hybrid_search_column = "" # Default is no TSV column for hybrid search
if hybrid_search_config:
hybrid_search_column_name = (
hybrid_search_config.tsv_column or hybrid_search_default_column_name
)
hybrid_search_column_name = self._escape_postgres_identifier(
hybrid_search_column_name
)
hybrid_search_config.tsv_column = hybrid_search_column_name
hybrid_search_column = f',"{self._escape_postgres_identifier(hybrid_search_column_name)}" TSVECTOR NOT NULL'

query = f"""CREATE TABLE "{schema_name}"."{table_name}"(
"{id_column_name}" {id_data_type} PRIMARY KEY,
"{content_column}" TEXT NOT NULL,
"{embedding_column}" vector({vector_size}) NOT NULL"""
"{embedding_column}" vector({vector_size}) NOT NULL
{hybrid_search_column}"""
for column in metadata_columns:
if isinstance(column, Column):
nullable = "NOT NULL" if not column.nullable else ""
Expand Down Expand Up @@ -258,6 +276,7 @@ async def ainit_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -280,6 +299,10 @@ async def ainit_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Note that queries might be slow if the hybrid search column does not exist.
For best hybrid search performance, consider creating a TSV column and adding GIN index.
Default: None.
"""
await self._run_as_async(
self._ainit_vectorstore_table(
Expand All @@ -293,6 +316,7 @@ async def ainit_vectorstore_table(
id_column=id_column,
overwrite_existing=overwrite_existing,
store_metadata=store_metadata,
hybrid_search_config=hybrid_search_config,
)
)

Expand All @@ -309,6 +333,7 @@ def init_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -331,6 +356,10 @@ def init_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Note that queries might be slow if the hybrid search column does not exist.
For best hybrid search performance, consider creating a TSV column and adding GIN index.
Default: None.
"""
self._run_as_sync(
self._ainit_vectorstore_table(
Expand All @@ -344,6 +373,7 @@ def init_vectorstore_table(
id_column=id_column,
overwrite_existing=overwrite_existing,
store_metadata=store_metadata,
hybrid_search_config=hybrid_search_config,
)
)

Expand All @@ -354,7 +384,7 @@ async def _adrop_table(
schema_name: str = "public",
) -> None:
"""Drop the vector store table"""
query = f'DROP TABLE "{schema_name}"."{table_name}";'
query = f'DROP TABLE IF EXISTS "{schema_name}"."{table_name}";'
async with self._pool.connect() as conn:
await conn.execute(text(query))
await conn.commit()
Expand Down
149 changes: 149 additions & 0 deletions langchain_postgres/v2/hybrid_search_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from abc import ABC
from dataclasses import dataclass, field
from typing import Any, Callable, Optional, Sequence

from sqlalchemy import RowMapping


def weighted_sum_ranking(
primary_search_results: Sequence[RowMapping],
secondary_search_results: Sequence[RowMapping],
primary_results_weight: float = 0.5,
secondary_results_weight: float = 0.5,
fetch_top_k: int = 4,
) -> Sequence[dict[str, Any]]:
"""
Ranks documents using a weighted sum of scores from two sources.

Args:
primary_search_results: A list of (document, distance) tuples from
the primary search.
secondary_search_results: A list of (document, distance) tuples from
the secondary search.
primary_results_weight: The weight for the primary source's scores.
Defaults to 0.5.
secondary_results_weight: The weight for the secondary source's scores.
Defaults to 0.5.
fetch_top_k: The number of documents to fetch after merging the results.
Defaults to 4.

Returns:
A list of (document, distance) tuples, sorted by weighted_score in
descending order.
"""

# stores computed metric with provided distance metric and weights
weighted_scores: dict[str, dict[str, Any]] = {}

# Process results from primary source
for row in primary_search_results:
values = list(row.values())
doc_id = str(values[0]) # first value is doc_id
distance = float(values[-1]) # type: ignore # last value is distance
row_values = dict(row)
row_values["distance"] = primary_results_weight * distance
weighted_scores[doc_id] = row_values

# Process results from secondary source,
# adding to existing scores or creating new ones
for row in secondary_search_results:
values = list(row.values())
doc_id = str(values[0]) # first value is doc_id
distance = float(values[-1]) # type: ignore # last value is distance
primary_score = (
weighted_scores[doc_id]["distance"] if doc_id in weighted_scores else 0.0
)
row_values = dict(row)
row_values["distance"] = distance * secondary_results_weight + primary_score
weighted_scores[doc_id] = row_values

# Sort the results by weighted score in descending order
ranked_results = sorted(
weighted_scores.values(), key=lambda item: item["distance"], reverse=True
)
return ranked_results[:fetch_top_k]


def reciprocal_rank_fusion(
primary_search_results: Sequence[RowMapping],
secondary_search_results: Sequence[RowMapping],
rrf_k: float = 60,
fetch_top_k: int = 4,
) -> Sequence[dict[str, Any]]:
"""
Ranks documents using Reciprocal Rank Fusion (RRF) of scores from two sources.

Args:
primary_search_results: A list of (document, distance) tuples from
the primary search.
secondary_search_results: A list of (document, distance) tuples from
the secondary search.
rrf_k: The RRF parameter k.
Defaults to 60.
fetch_top_k: The number of documents to fetch after merging the results.
Defaults to 4.

Returns:
A list of (document_id, rrf_score) tuples, sorted by rrf_score
in descending order.
"""
rrf_scores: dict[str, dict[str, Any]] = {}

# Process results from primary source
for rank, row in enumerate(
sorted(primary_search_results, key=lambda item: item["distance"], reverse=True)
):
values = list(row.values())
doc_id = str(values[0])
row_values = dict(row)
primary_score = rrf_scores[doc_id]["distance"] if doc_id in rrf_scores else 0.0
primary_score += 1.0 / (rank + rrf_k)
row_values["distance"] = primary_score
rrf_scores[doc_id] = row_values

# Process results from secondary source
for rank, row in enumerate(
sorted(
secondary_search_results, key=lambda item: item["distance"], reverse=True
)
):
values = list(row.values())
doc_id = str(values[0])
row_values = dict(row)
secondary_score = (
rrf_scores[doc_id]["distance"] if doc_id in rrf_scores else 0.0
)
secondary_score += 1.0 / (rank + rrf_k)
row_values["distance"] = secondary_score
rrf_scores[doc_id] = row_values

# Sort the results by rrf score in descending order
# Sort the results by weighted score in descending order
ranked_results = sorted(
rrf_scores.values(), key=lambda item: item["distance"], reverse=True
)
# Extract only the RowMapping for the top results
return ranked_results[:fetch_top_k]


@dataclass
class HybridSearchConfig(ABC):
"""
AlloyDB Vector Store Hybrid Search Config.

Queries might be slow if the hybrid search column does not exist.
For best hybrid search performance, consider creating a TSV column
and adding GIN index.
"""

tsv_column: Optional[str] = ""
tsv_lang: Optional[str] = "pg_catalog.english"
fts_query: Optional[str] = ""
fusion_function: Callable[
[Sequence[RowMapping], Sequence[RowMapping], Any], Sequence[Any]
] = weighted_sum_ranking # Updated default
fusion_function_parameters: dict[str, Any] = field(default_factory=dict)
primary_top_k: int = 4
secondary_top_k: int = 4
index_name: str = "langchain_tsv_index"
index_type: str = "GIN"
Loading