Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 152 additions & 13 deletions langchain_postgres/v2/async_vectorstore.py

Large diffs are not rendered by default.

34 changes: 32 additions & 2 deletions langchain_postgres/v2/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
from sqlalchemy.engine import URL
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine

from .hybrid_search_config import HybridSearchConfig

T = TypeVar("T")


Expand Down Expand Up @@ -156,6 +158,7 @@ async def _ainit_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -178,6 +181,8 @@ async def _ainit_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Default: None.

Raises:
:class:`DuplicateTableError <asyncpg.exceptions.DuplicateTableError>`: if table already exists.
Expand All @@ -186,6 +191,7 @@ async def _ainit_vectorstore_table(

schema_name = self._escape_postgres_identifier(schema_name)
table_name = self._escape_postgres_identifier(table_name)
hybrid_search_default_column_name = content_column + "_tsv"
content_column = self._escape_postgres_identifier(content_column)
embedding_column = self._escape_postgres_identifier(embedding_column)
if metadata_columns is None:
Expand Down Expand Up @@ -226,10 +232,22 @@ async def _ainit_vectorstore_table(
id_data_type = id_column["data_type"]
id_column_name = id_column["name"]

hybrid_search_column = "" # Default is no TSV column for hybrid search
if hybrid_search_config:
hybrid_search_column_name = (
hybrid_search_config.tsv_column or hybrid_search_default_column_name
)
hybrid_search_column_name = self._escape_postgres_identifier(
hybrid_search_column_name
)
hybrid_search_config.tsv_column = hybrid_search_column_name
hybrid_search_column = f',"{self._escape_postgres_identifier(hybrid_search_column_name)}" TSVECTOR NOT NULL'

query = f"""CREATE TABLE "{schema_name}"."{table_name}"(
"{id_column_name}" {id_data_type} PRIMARY KEY,
"{content_column}" TEXT NOT NULL,
"{embedding_column}" vector({vector_size}) NOT NULL"""
"{embedding_column}" vector({vector_size}) NOT NULL
{hybrid_search_column}"""
for column in metadata_columns:
if isinstance(column, Column):
nullable = "NOT NULL" if not column.nullable else ""
Expand Down Expand Up @@ -258,6 +276,7 @@ async def ainit_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -280,6 +299,10 @@ async def ainit_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Note that queries might be slow if the hybrid search column does not exist.
For best hybrid search performance, consider creating a TSV column and adding GIN index.
Default: None.
"""
await self._run_as_async(
self._ainit_vectorstore_table(
Expand All @@ -293,6 +316,7 @@ async def ainit_vectorstore_table(
id_column=id_column,
overwrite_existing=overwrite_existing,
store_metadata=store_metadata,
hybrid_search_config=hybrid_search_config,
)
)

Expand All @@ -309,6 +333,7 @@ def init_vectorstore_table(
id_column: Union[str, Column, ColumnDict] = "langchain_id",
overwrite_existing: bool = False,
store_metadata: bool = True,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> None:
"""
Create a table for saving of vectors to be used with PGVectorStore.
Expand All @@ -331,6 +356,10 @@ def init_vectorstore_table(
overwrite_existing (bool): Whether to drop existing table. Default: False.
store_metadata (bool): Whether to store metadata in the table.
Default: True.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
Note that queries might be slow if the hybrid search column does not exist.
For best hybrid search performance, consider creating a TSV column and adding GIN index.
Default: None.
"""
self._run_as_sync(
self._ainit_vectorstore_table(
Expand All @@ -344,6 +373,7 @@ def init_vectorstore_table(
id_column=id_column,
overwrite_existing=overwrite_existing,
store_metadata=store_metadata,
hybrid_search_config=hybrid_search_config,
)
)

Expand All @@ -354,7 +384,7 @@ async def _adrop_table(
schema_name: str = "public",
) -> None:
"""Drop the vector store table"""
query = f'DROP TABLE "{schema_name}"."{table_name}";'
query = f'DROP TABLE IF EXISTS "{schema_name}"."{table_name}";'
async with self._pool.connect() as conn:
await conn.execute(text(query))
await conn.commit()
Expand Down
8 changes: 7 additions & 1 deletion langchain_postgres/v2/hybrid_search_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,13 @@ def reciprocal_rank_fusion(

@dataclass
class HybridSearchConfig(ABC):
"""AlloyDB Vector Store Hybrid Search Config."""
"""
AlloyDB Vector Store Hybrid Search Config.

Queries might be slow if the hybrid search column does not exist.
For best hybrid search performance, consider creating a TSV column
and adding GIN index.
"""

tsv_column: Optional[str] = ""
tsv_lang: Optional[str] = "pg_catalog.english"
Expand Down
19 changes: 19 additions & 0 deletions langchain_postgres/v2/vectorstores.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from .async_vectorstore import AsyncPGVectorStore
from .engine import PGEngine
from .hybrid_search_config import HybridSearchConfig
from .indexes import (
DEFAULT_DISTANCE_STRATEGY,
BaseIndex,
Expand Down Expand Up @@ -59,6 +60,7 @@ async def create(
fetch_k: int = 20,
lambda_mult: float = 0.5,
index_query_options: Optional[QueryOptions] = None,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> PGVectorStore:
"""Create an PGVectorStore instance.
Expand All @@ -78,6 +80,7 @@ async def create(
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
index_query_options (QueryOptions): Index query option.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
Returns:
PGVectorStore
Expand All @@ -98,6 +101,7 @@ async def create(
fetch_k=fetch_k,
lambda_mult=lambda_mult,
index_query_options=index_query_options,
hybrid_search_config=hybrid_search_config,
)
vs = await engine._run_as_async(coro)
return cls(cls.__create_key, engine, vs)
Expand All @@ -120,6 +124,7 @@ def create_sync(
fetch_k: int = 20,
lambda_mult: float = 0.5,
index_query_options: Optional[QueryOptions] = None,
hybrid_search_config: Optional[HybridSearchConfig] = None,
) -> PGVectorStore:
"""Create an PGVectorStore instance.
Expand All @@ -140,6 +145,7 @@ def create_sync(
fetch_k (int, optional): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20.
lambda_mult (float, optional): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
index_query_options (Optional[QueryOptions], optional): Index query option. Defaults to None.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
Returns:
PGVectorStore
Expand All @@ -160,6 +166,7 @@ def create_sync(
fetch_k=fetch_k,
lambda_mult=lambda_mult,
index_query_options=index_query_options,
hybrid_search_config=hybrid_search_config,
)
vs = engine._run_as_sync(coro)
return cls(cls.__create_key, engine, vs)
Expand Down Expand Up @@ -301,6 +308,7 @@ async def afrom_texts( # type: ignore[override]
fetch_k: int = 20,
lambda_mult: float = 0.5,
index_query_options: Optional[QueryOptions] = None,
hybrid_search_config: Optional[HybridSearchConfig] = None,
**kwargs: Any,
) -> PGVectorStore:
"""Create an PGVectorStore instance from texts.
Expand All @@ -324,6 +332,7 @@ async def afrom_texts( # type: ignore[override]
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
index_query_options (QueryOptions): Index query option.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
Raises:
:class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`.
Expand All @@ -347,6 +356,7 @@ async def afrom_texts( # type: ignore[override]
fetch_k=fetch_k,
lambda_mult=lambda_mult,
index_query_options=index_query_options,
hybrid_search_config=hybrid_search_config,
)
await vs.aadd_texts(texts, metadatas=metadatas, ids=ids)
return vs
Expand All @@ -371,6 +381,7 @@ async def afrom_documents( # type: ignore[override]
fetch_k: int = 20,
lambda_mult: float = 0.5,
index_query_options: Optional[QueryOptions] = None,
hybrid_search_config: Optional[HybridSearchConfig] = None,
**kwargs: Any,
) -> PGVectorStore:
"""Create an PGVectorStore instance from documents.
Expand All @@ -393,6 +404,7 @@ async def afrom_documents( # type: ignore[override]
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
index_query_options (QueryOptions): Index query option.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
Raises:
:class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`.
Expand All @@ -417,6 +429,7 @@ async def afrom_documents( # type: ignore[override]
fetch_k=fetch_k,
lambda_mult=lambda_mult,
index_query_options=index_query_options,
hybrid_search_config=hybrid_search_config,
)
await vs.aadd_documents(documents, ids=ids)
return vs
Expand All @@ -442,6 +455,7 @@ def from_texts( # type: ignore[override]
fetch_k: int = 20,
lambda_mult: float = 0.5,
index_query_options: Optional[QueryOptions] = None,
hybrid_search_config: Optional[HybridSearchConfig] = None,
**kwargs: Any,
) -> PGVectorStore:
"""Create an PGVectorStore instance from texts.
Expand All @@ -465,6 +479,7 @@ def from_texts( # type: ignore[override]
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
index_query_options (QueryOptions): Index query option.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
Raises:
:class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`.
Expand All @@ -488,6 +503,7 @@ def from_texts( # type: ignore[override]
fetch_k=fetch_k,
lambda_mult=lambda_mult,
index_query_options=index_query_options,
hybrid_search_config=hybrid_search_config,
**kwargs,
)
vs.add_texts(texts, metadatas=metadatas, ids=ids)
Expand All @@ -513,6 +529,7 @@ def from_documents( # type: ignore[override]
fetch_k: int = 20,
lambda_mult: float = 0.5,
index_query_options: Optional[QueryOptions] = None,
hybrid_search_config: Optional[HybridSearchConfig] = None,
**kwargs: Any,
) -> PGVectorStore:
"""Create an PGVectorStore instance from documents.
Expand All @@ -535,6 +552,7 @@ def from_documents( # type: ignore[override]
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
index_query_options (QueryOptions): Index query option.
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
Raises:
:class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`.
Expand All @@ -558,6 +576,7 @@ def from_documents( # type: ignore[override]
fetch_k=fetch_k,
lambda_mult=lambda_mult,
index_query_options=index_query_options,
hybrid_search_config=hybrid_search_config,
**kwargs,
)
vs.add_documents(documents, ids=ids)
Expand Down
73 changes: 65 additions & 8 deletions tests/unit_tests/v2/test_async_pg_vectorstore_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,13 @@

from langchain_postgres import PGEngine
from langchain_postgres.v2.async_vectorstore import AsyncPGVectorStore
from langchain_postgres.v2.indexes import (
DistanceStrategy,
HNSWIndex,
IVFFlatIndex,
)
from langchain_postgres.v2.hybrid_search_config import HybridSearchConfig
from langchain_postgres.v2.indexes import DistanceStrategy, HNSWIndex, IVFFlatIndex
from tests.utils import VECTORSTORE_CONNECTION_STRING as CONNECTION_STRING

uuid_str = str(uuid.uuid4()).replace("-", "_")
DEFAULT_TABLE = "default" + uuid_str
DEFAULT_HYBRID_TABLE = "hybrid" + uuid_str
DEFAULT_INDEX_NAME = "index" + uuid_str
VECTOR_SIZE = 768
SIMPLE_TABLE = "default_table"
Expand Down Expand Up @@ -55,8 +53,10 @@ class TestIndex:
async def engine(self) -> AsyncIterator[PGEngine]:
engine = PGEngine.from_connection_string(url=CONNECTION_STRING)
yield engine
await aexecute(engine, f"DROP TABLE IF EXISTS {DEFAULT_TABLE}")
await aexecute(engine, f"DROP TABLE IF EXISTS {SIMPLE_TABLE}")

await engine._adrop_table(DEFAULT_TABLE)
await engine._adrop_table(DEFAULT_HYBRID_TABLE)
await engine._adrop_table(SIMPLE_TABLE)
await engine.close()

@pytest_asyncio.fixture(scope="class")
Expand All @@ -73,7 +73,9 @@ async def vs(self, engine: PGEngine) -> AsyncIterator[AsyncPGVectorStore]:
yield vs

async def test_apply_default_name_vector_index(self, engine: PGEngine) -> None:
await engine._ainit_vectorstore_table(SIMPLE_TABLE, VECTOR_SIZE)
await engine._ainit_vectorstore_table(
SIMPLE_TABLE, VECTOR_SIZE, overwrite_existing=True
)
vs = await AsyncPGVectorStore.create(
engine,
embedding_service=embeddings_service,
Expand All @@ -92,6 +94,61 @@ async def test_aapply_vector_index(self, vs: AsyncPGVectorStore) -> None:
assert await vs.is_valid_index(DEFAULT_INDEX_NAME)
await vs.adrop_vector_index(DEFAULT_INDEX_NAME)

async def test_aapply_vector_index_non_hybrid_search_vs(
self, vs: AsyncPGVectorStore
) -> None:
with pytest.raises(ValueError):
await vs.aapply_hybrid_search_index()

async def test_aapply_hybrid_search_index_table_without_tsv_column(
self, engine: PGEngine, vs: AsyncPGVectorStore
) -> None:
# overwriting vs to get a hybrid vs
tsv_index_name = "tsv_index_on_table_without_tsv_column_" + uuid_str
vs = await AsyncPGVectorStore.create(
engine,
embedding_service=embeddings_service,
table_name=DEFAULT_TABLE,
hybrid_search_config=HybridSearchConfig(index_name=tsv_index_name),
)
is_valid_index = await vs.is_valid_index(tsv_index_name)
assert is_valid_index == False
await vs.aapply_hybrid_search_index()
assert await vs.is_valid_index(tsv_index_name)
await vs.adrop_vector_index(tsv_index_name)
is_valid_index = await vs.is_valid_index(tsv_index_name)
assert is_valid_index == False

async def test_aapply_hybrid_search_index_table_with_tsv_column(
self, engine: PGEngine
) -> None:
tsv_index_name = "tsv_index_on_table_without_tsv_column_" + uuid_str
config = HybridSearchConfig(
tsv_column="tsv_column",
tsv_lang="pg_catalog.english",
index_name=tsv_index_name,
)
await engine._ainit_vectorstore_table(
DEFAULT_HYBRID_TABLE,
VECTOR_SIZE,
hybrid_search_config=config,
)
vs = await AsyncPGVectorStore.create(
engine,
embedding_service=embeddings_service,
table_name=DEFAULT_HYBRID_TABLE,
hybrid_search_config=config,
)
is_valid_index = await vs.is_valid_index(tsv_index_name)
assert is_valid_index == False
await vs.aapply_hybrid_search_index()
assert await vs.is_valid_index(tsv_index_name)
await vs.areindex(tsv_index_name)
assert await vs.is_valid_index(tsv_index_name)
await vs.adrop_vector_index(tsv_index_name)
is_valid_index = await vs.is_valid_index(tsv_index_name)
assert is_valid_index == False

async def test_areindex(self, vs: AsyncPGVectorStore) -> None:
if not await vs.is_valid_index(DEFAULT_INDEX_NAME):
index = HNSWIndex(name=DEFAULT_INDEX_NAME)
Expand Down
Loading