Skip to content

Commit 620e3e5

Browse files
feat: create hybrid search capable vector store table [2/N]
2 parents 57ceb2c + 076f0cb commit 620e3e5

File tree

8 files changed

+824
-33
lines changed

8 files changed

+824
-33
lines changed

langchain_postgres/v2/async_vectorstore.py

Lines changed: 152 additions & 13 deletions
Large diffs are not rendered by default.

langchain_postgres/v2/engine.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
from sqlalchemy.engine import URL
1010
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
1111

12+
from .hybrid_search_config import HybridSearchConfig
13+
1214
T = TypeVar("T")
1315

1416

@@ -156,6 +158,7 @@ async def _ainit_vectorstore_table(
156158
id_column: Union[str, Column, ColumnDict] = "langchain_id",
157159
overwrite_existing: bool = False,
158160
store_metadata: bool = True,
161+
hybrid_search_config: Optional[HybridSearchConfig] = None,
159162
) -> None:
160163
"""
161164
Create a table for saving of vectors to be used with PGVectorStore.
@@ -178,6 +181,8 @@ async def _ainit_vectorstore_table(
178181
overwrite_existing (bool): Whether to drop existing table. Default: False.
179182
store_metadata (bool): Whether to store metadata in the table.
180183
Default: True.
184+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
185+
Default: None.
181186
182187
Raises:
183188
:class:`DuplicateTableError <asyncpg.exceptions.DuplicateTableError>`: if table already exists.
@@ -186,6 +191,7 @@ async def _ainit_vectorstore_table(
186191

187192
schema_name = self._escape_postgres_identifier(schema_name)
188193
table_name = self._escape_postgres_identifier(table_name)
194+
hybrid_search_default_column_name = content_column + "_tsv"
189195
content_column = self._escape_postgres_identifier(content_column)
190196
embedding_column = self._escape_postgres_identifier(embedding_column)
191197
if metadata_columns is None:
@@ -226,10 +232,22 @@ async def _ainit_vectorstore_table(
226232
id_data_type = id_column["data_type"]
227233
id_column_name = id_column["name"]
228234

235+
hybrid_search_column = "" # Default is no TSV column for hybrid search
236+
if hybrid_search_config:
237+
hybrid_search_column_name = (
238+
hybrid_search_config.tsv_column or hybrid_search_default_column_name
239+
)
240+
hybrid_search_column_name = self._escape_postgres_identifier(
241+
hybrid_search_column_name
242+
)
243+
hybrid_search_config.tsv_column = hybrid_search_column_name
244+
hybrid_search_column = f',"{self._escape_postgres_identifier(hybrid_search_column_name)}" TSVECTOR NOT NULL'
245+
229246
query = f"""CREATE TABLE "{schema_name}"."{table_name}"(
230247
"{id_column_name}" {id_data_type} PRIMARY KEY,
231248
"{content_column}" TEXT NOT NULL,
232-
"{embedding_column}" vector({vector_size}) NOT NULL"""
249+
"{embedding_column}" vector({vector_size}) NOT NULL
250+
{hybrid_search_column}"""
233251
for column in metadata_columns:
234252
if isinstance(column, Column):
235253
nullable = "NOT NULL" if not column.nullable else ""
@@ -258,6 +276,7 @@ async def ainit_vectorstore_table(
258276
id_column: Union[str, Column, ColumnDict] = "langchain_id",
259277
overwrite_existing: bool = False,
260278
store_metadata: bool = True,
279+
hybrid_search_config: Optional[HybridSearchConfig] = None,
261280
) -> None:
262281
"""
263282
Create a table for saving of vectors to be used with PGVectorStore.
@@ -280,6 +299,10 @@ async def ainit_vectorstore_table(
280299
overwrite_existing (bool): Whether to drop existing table. Default: False.
281300
store_metadata (bool): Whether to store metadata in the table.
282301
Default: True.
302+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
303+
Note that queries might be slow if the hybrid search column does not exist.
304+
For best hybrid search performance, consider creating a TSV column and adding GIN index.
305+
Default: None.
283306
"""
284307
await self._run_as_async(
285308
self._ainit_vectorstore_table(
@@ -293,6 +316,7 @@ async def ainit_vectorstore_table(
293316
id_column=id_column,
294317
overwrite_existing=overwrite_existing,
295318
store_metadata=store_metadata,
319+
hybrid_search_config=hybrid_search_config,
296320
)
297321
)
298322

@@ -309,6 +333,7 @@ def init_vectorstore_table(
309333
id_column: Union[str, Column, ColumnDict] = "langchain_id",
310334
overwrite_existing: bool = False,
311335
store_metadata: bool = True,
336+
hybrid_search_config: Optional[HybridSearchConfig] = None,
312337
) -> None:
313338
"""
314339
Create a table for saving of vectors to be used with PGVectorStore.
@@ -331,6 +356,10 @@ def init_vectorstore_table(
331356
overwrite_existing (bool): Whether to drop existing table. Default: False.
332357
store_metadata (bool): Whether to store metadata in the table.
333358
Default: True.
359+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration.
360+
Note that queries might be slow if the hybrid search column does not exist.
361+
For best hybrid search performance, consider creating a TSV column and adding GIN index.
362+
Default: None.
334363
"""
335364
self._run_as_sync(
336365
self._ainit_vectorstore_table(
@@ -344,6 +373,7 @@ def init_vectorstore_table(
344373
id_column=id_column,
345374
overwrite_existing=overwrite_existing,
346375
store_metadata=store_metadata,
376+
hybrid_search_config=hybrid_search_config,
347377
)
348378
)
349379

@@ -354,7 +384,7 @@ async def _adrop_table(
354384
schema_name: str = "public",
355385
) -> None:
356386
"""Drop the vector store table"""
357-
query = f'DROP TABLE "{schema_name}"."{table_name}";'
387+
query = f'DROP TABLE IF EXISTS "{schema_name}"."{table_name}";'
358388
async with self._pool.connect() as conn:
359389
await conn.execute(text(query))
360390
await conn.commit()

langchain_postgres/v2/hybrid_search_config.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,13 @@ def reciprocal_rank_fusion(
128128

129129
@dataclass
130130
class HybridSearchConfig(ABC):
131-
"""AlloyDB Vector Store Hybrid Search Config."""
131+
"""
132+
AlloyDB Vector Store Hybrid Search Config.
133+
134+
Queries might be slow if the hybrid search column does not exist.
135+
For best hybrid search performance, consider creating a TSV column
136+
and adding GIN index.
137+
"""
132138

133139
tsv_column: Optional[str] = ""
134140
tsv_lang: Optional[str] = "pg_catalog.english"

langchain_postgres/v2/vectorstores.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from .async_vectorstore import AsyncPGVectorStore
1111
from .engine import PGEngine
12+
from .hybrid_search_config import HybridSearchConfig
1213
from .indexes import (
1314
DEFAULT_DISTANCE_STRATEGY,
1415
BaseIndex,
@@ -59,6 +60,7 @@ async def create(
5960
fetch_k: int = 20,
6061
lambda_mult: float = 0.5,
6162
index_query_options: Optional[QueryOptions] = None,
63+
hybrid_search_config: Optional[HybridSearchConfig] = None,
6264
) -> PGVectorStore:
6365
"""Create an PGVectorStore instance.
6466
@@ -78,6 +80,7 @@ async def create(
7880
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
7981
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
8082
index_query_options (QueryOptions): Index query option.
83+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
8184
8285
Returns:
8386
PGVectorStore
@@ -98,6 +101,7 @@ async def create(
98101
fetch_k=fetch_k,
99102
lambda_mult=lambda_mult,
100103
index_query_options=index_query_options,
104+
hybrid_search_config=hybrid_search_config,
101105
)
102106
vs = await engine._run_as_async(coro)
103107
return cls(cls.__create_key, engine, vs)
@@ -120,6 +124,7 @@ def create_sync(
120124
fetch_k: int = 20,
121125
lambda_mult: float = 0.5,
122126
index_query_options: Optional[QueryOptions] = None,
127+
hybrid_search_config: Optional[HybridSearchConfig] = None,
123128
) -> PGVectorStore:
124129
"""Create an PGVectorStore instance.
125130
@@ -140,6 +145,7 @@ def create_sync(
140145
fetch_k (int, optional): Number of Documents to fetch to pass to MMR algorithm. Defaults to 20.
141146
lambda_mult (float, optional): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
142147
index_query_options (Optional[QueryOptions], optional): Index query option. Defaults to None.
148+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
143149
144150
Returns:
145151
PGVectorStore
@@ -160,6 +166,7 @@ def create_sync(
160166
fetch_k=fetch_k,
161167
lambda_mult=lambda_mult,
162168
index_query_options=index_query_options,
169+
hybrid_search_config=hybrid_search_config,
163170
)
164171
vs = engine._run_as_sync(coro)
165172
return cls(cls.__create_key, engine, vs)
@@ -301,6 +308,7 @@ async def afrom_texts( # type: ignore[override]
301308
fetch_k: int = 20,
302309
lambda_mult: float = 0.5,
303310
index_query_options: Optional[QueryOptions] = None,
311+
hybrid_search_config: Optional[HybridSearchConfig] = None,
304312
**kwargs: Any,
305313
) -> PGVectorStore:
306314
"""Create an PGVectorStore instance from texts.
@@ -324,6 +332,7 @@ async def afrom_texts( # type: ignore[override]
324332
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
325333
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
326334
index_query_options (QueryOptions): Index query option.
335+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
327336
328337
Raises:
329338
:class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`.
@@ -347,6 +356,7 @@ async def afrom_texts( # type: ignore[override]
347356
fetch_k=fetch_k,
348357
lambda_mult=lambda_mult,
349358
index_query_options=index_query_options,
359+
hybrid_search_config=hybrid_search_config,
350360
)
351361
await vs.aadd_texts(texts, metadatas=metadatas, ids=ids)
352362
return vs
@@ -371,6 +381,7 @@ async def afrom_documents( # type: ignore[override]
371381
fetch_k: int = 20,
372382
lambda_mult: float = 0.5,
373383
index_query_options: Optional[QueryOptions] = None,
384+
hybrid_search_config: Optional[HybridSearchConfig] = None,
374385
**kwargs: Any,
375386
) -> PGVectorStore:
376387
"""Create an PGVectorStore instance from documents.
@@ -393,6 +404,7 @@ async def afrom_documents( # type: ignore[override]
393404
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
394405
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
395406
index_query_options (QueryOptions): Index query option.
407+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
396408
397409
Raises:
398410
:class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`.
@@ -417,6 +429,7 @@ async def afrom_documents( # type: ignore[override]
417429
fetch_k=fetch_k,
418430
lambda_mult=lambda_mult,
419431
index_query_options=index_query_options,
432+
hybrid_search_config=hybrid_search_config,
420433
)
421434
await vs.aadd_documents(documents, ids=ids)
422435
return vs
@@ -442,6 +455,7 @@ def from_texts( # type: ignore[override]
442455
fetch_k: int = 20,
443456
lambda_mult: float = 0.5,
444457
index_query_options: Optional[QueryOptions] = None,
458+
hybrid_search_config: Optional[HybridSearchConfig] = None,
445459
**kwargs: Any,
446460
) -> PGVectorStore:
447461
"""Create an PGVectorStore instance from texts.
@@ -465,6 +479,7 @@ def from_texts( # type: ignore[override]
465479
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
466480
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
467481
index_query_options (QueryOptions): Index query option.
482+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
468483
469484
Raises:
470485
:class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`.
@@ -488,6 +503,7 @@ def from_texts( # type: ignore[override]
488503
fetch_k=fetch_k,
489504
lambda_mult=lambda_mult,
490505
index_query_options=index_query_options,
506+
hybrid_search_config=hybrid_search_config,
491507
**kwargs,
492508
)
493509
vs.add_texts(texts, metadatas=metadatas, ids=ids)
@@ -513,6 +529,7 @@ def from_documents( # type: ignore[override]
513529
fetch_k: int = 20,
514530
lambda_mult: float = 0.5,
515531
index_query_options: Optional[QueryOptions] = None,
532+
hybrid_search_config: Optional[HybridSearchConfig] = None,
516533
**kwargs: Any,
517534
) -> PGVectorStore:
518535
"""Create an PGVectorStore instance from documents.
@@ -535,6 +552,7 @@ def from_documents( # type: ignore[override]
535552
fetch_k (int): Number of Documents to fetch to pass to MMR algorithm.
536553
lambda_mult (float): Number between 0 and 1 that determines the degree of diversity among the results with 0 corresponding to maximum diversity and 1 to minimum diversity. Defaults to 0.5.
537554
index_query_options (QueryOptions): Index query option.
555+
hybrid_search_config (HybridSearchConfig): Hybrid search configuration. Defaults to None.
538556
539557
Raises:
540558
:class:`InvalidTextRepresentationError <asyncpg.exceptions.InvalidTextRepresentationError>`: if the `ids` data type does not match that of the `id_column`.
@@ -558,6 +576,7 @@ def from_documents( # type: ignore[override]
558576
fetch_k=fetch_k,
559577
lambda_mult=lambda_mult,
560578
index_query_options=index_query_options,
579+
hybrid_search_config=hybrid_search_config,
561580
**kwargs,
562581
)
563582
vs.add_documents(documents, ids=ids)

tests/unit_tests/v2/test_async_pg_vectorstore_index.py

Lines changed: 65 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,13 @@
1010

1111
from langchain_postgres import PGEngine
1212
from langchain_postgres.v2.async_vectorstore import AsyncPGVectorStore
13-
from langchain_postgres.v2.indexes import (
14-
DistanceStrategy,
15-
HNSWIndex,
16-
IVFFlatIndex,
17-
)
13+
from langchain_postgres.v2.hybrid_search_config import HybridSearchConfig
14+
from langchain_postgres.v2.indexes import DistanceStrategy, HNSWIndex, IVFFlatIndex
1815
from tests.utils import VECTORSTORE_CONNECTION_STRING as CONNECTION_STRING
1916

2017
uuid_str = str(uuid.uuid4()).replace("-", "_")
2118
DEFAULT_TABLE = "default" + uuid_str
19+
DEFAULT_HYBRID_TABLE = "hybrid" + uuid_str
2220
DEFAULT_INDEX_NAME = "index" + uuid_str
2321
VECTOR_SIZE = 768
2422
SIMPLE_TABLE = "default_table"
@@ -55,8 +53,10 @@ class TestIndex:
5553
async def engine(self) -> AsyncIterator[PGEngine]:
5654
engine = PGEngine.from_connection_string(url=CONNECTION_STRING)
5755
yield engine
58-
await aexecute(engine, f"DROP TABLE IF EXISTS {DEFAULT_TABLE}")
59-
await aexecute(engine, f"DROP TABLE IF EXISTS {SIMPLE_TABLE}")
56+
57+
await engine._adrop_table(DEFAULT_TABLE)
58+
await engine._adrop_table(DEFAULT_HYBRID_TABLE)
59+
await engine._adrop_table(SIMPLE_TABLE)
6060
await engine.close()
6161

6262
@pytest_asyncio.fixture(scope="class")
@@ -73,7 +73,9 @@ async def vs(self, engine: PGEngine) -> AsyncIterator[AsyncPGVectorStore]:
7373
yield vs
7474

7575
async def test_apply_default_name_vector_index(self, engine: PGEngine) -> None:
76-
await engine._ainit_vectorstore_table(SIMPLE_TABLE, VECTOR_SIZE)
76+
await engine._ainit_vectorstore_table(
77+
SIMPLE_TABLE, VECTOR_SIZE, overwrite_existing=True
78+
)
7779
vs = await AsyncPGVectorStore.create(
7880
engine,
7981
embedding_service=embeddings_service,
@@ -92,6 +94,61 @@ async def test_aapply_vector_index(self, vs: AsyncPGVectorStore) -> None:
9294
assert await vs.is_valid_index(DEFAULT_INDEX_NAME)
9395
await vs.adrop_vector_index(DEFAULT_INDEX_NAME)
9496

97+
async def test_aapply_vector_index_non_hybrid_search_vs(
98+
self, vs: AsyncPGVectorStore
99+
) -> None:
100+
with pytest.raises(ValueError):
101+
await vs.aapply_hybrid_search_index()
102+
103+
async def test_aapply_hybrid_search_index_table_without_tsv_column(
104+
self, engine: PGEngine, vs: AsyncPGVectorStore
105+
) -> None:
106+
# overwriting vs to get a hybrid vs
107+
tsv_index_name = "tsv_index_on_table_without_tsv_column_" + uuid_str
108+
vs = await AsyncPGVectorStore.create(
109+
engine,
110+
embedding_service=embeddings_service,
111+
table_name=DEFAULT_TABLE,
112+
hybrid_search_config=HybridSearchConfig(index_name=tsv_index_name),
113+
)
114+
is_valid_index = await vs.is_valid_index(tsv_index_name)
115+
assert is_valid_index == False
116+
await vs.aapply_hybrid_search_index()
117+
assert await vs.is_valid_index(tsv_index_name)
118+
await vs.adrop_vector_index(tsv_index_name)
119+
is_valid_index = await vs.is_valid_index(tsv_index_name)
120+
assert is_valid_index == False
121+
122+
async def test_aapply_hybrid_search_index_table_with_tsv_column(
123+
self, engine: PGEngine
124+
) -> None:
125+
tsv_index_name = "tsv_index_on_table_without_tsv_column_" + uuid_str
126+
config = HybridSearchConfig(
127+
tsv_column="tsv_column",
128+
tsv_lang="pg_catalog.english",
129+
index_name=tsv_index_name,
130+
)
131+
await engine._ainit_vectorstore_table(
132+
DEFAULT_HYBRID_TABLE,
133+
VECTOR_SIZE,
134+
hybrid_search_config=config,
135+
)
136+
vs = await AsyncPGVectorStore.create(
137+
engine,
138+
embedding_service=embeddings_service,
139+
table_name=DEFAULT_HYBRID_TABLE,
140+
hybrid_search_config=config,
141+
)
142+
is_valid_index = await vs.is_valid_index(tsv_index_name)
143+
assert is_valid_index == False
144+
await vs.aapply_hybrid_search_index()
145+
assert await vs.is_valid_index(tsv_index_name)
146+
await vs.areindex(tsv_index_name)
147+
assert await vs.is_valid_index(tsv_index_name)
148+
await vs.adrop_vector_index(tsv_index_name)
149+
is_valid_index = await vs.is_valid_index(tsv_index_name)
150+
assert is_valid_index == False
151+
95152
async def test_areindex(self, vs: AsyncPGVectorStore) -> None:
96153
if not await vs.is_valid_index(DEFAULT_INDEX_NAME):
97154
index = HNSWIndex(name=DEFAULT_INDEX_NAME)

0 commit comments

Comments
 (0)