Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d14e5fe
feat: Add sparse vectors benchmark support in Qdrant
KShivendu Apr 3, 2024
b9be8bb
fix: Self review
KShivendu Apr 3, 2024
7af636a
feat: Add sparse dataset for CI benchmarks
KShivendu Apr 3, 2024
ce902b2
feat: Introduce SparseVector class
KShivendu Apr 8, 2024
50ca05f
feat: Disallow sparse vector dataset being run with non sparse vector…
KShivendu Apr 8, 2024
feb3323
feat: use different engine config to run sparse vector benchmarks
KShivendu Apr 9, 2024
2a653f7
fix: use different engine config to run sparse vector benchmarks
KShivendu Apr 9, 2024
9d0fc40
feat: Optimize CI benchmarks workflow
KShivendu Apr 9, 2024
218c775
feat: Add 1M sparse dataset
KShivendu Apr 9, 2024
36bcfaa
fix: remove scipy, read csr matrix manually (#117)
joein Apr 9, 2024
4a7f09d
fix: Dataset query reader should have sparse_vector=None by default
KShivendu Apr 9, 2024
074d06c
refactor: Changes based on feedback
KShivendu Apr 10, 2024
8622eea
refactoring: refactor sparse vector support (#118)
joein Apr 11, 2024
6be36d2
feat: Use pydantic construct
KShivendu Apr 11, 2024
174ef91
refactor: Update all engines to use Query and Record dataclasses (#116)
KShivendu Apr 16, 2024
750ad61
Merge branch 'master' into feat/sparse-ci-benchmarks
KShivendu Apr 17, 2024
931d9e3
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Apr 17, 2024
6aa5ee8
fix: Type issue
KShivendu Apr 17, 2024
542fd33
fix: Allow python 3.8 since scipy is now removed
KShivendu Apr 17, 2024
b11d41f
fix: Add missing redis-m-16-ef-128 config
KShivendu Apr 17, 2024
a30f25b
fix: redis container port
KShivendu Apr 17, 2024
4091b78
fix linter
generall Apr 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions .github/workflows/continuous-benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,29 @@ jobs:
- uses: webfactory/[email protected]
with:
ssh-private-key: ${{ secrets.SSH_PRIVATE_KEY }}
- name: Setup CI
run: bash -x tools/setup_ci.sh
- name: Benches
run: |
export HCLOUD_TOKEN=${{ secrets.HCLOUD_TOKEN }}
export GCS_KEY=${{ secrets.GCS_KEY }}
export GCS_SECRET=${{ secrets.GCS_SECRET }}
export POSTGRES_PASSWORD=${{ secrets.POSTGRES_PASSWORD }}
export POSTGRES_HOST=${{ secrets.POSTGRES_HOST }}
export HCLOUD_TOKEN=${{ secrets.HCLOUD_TOKEN }}
export GCS_KEY=${{ secrets.GCS_KEY }}
export GCS_SECRET=${{ secrets.GCS_SECRET }}
export POSTGRES_PASSWORD=${{ secrets.POSTGRES_PASSWORD }}
export POSTGRES_HOST=${{ secrets.POSTGRES_HOST }}

# Benchmark the dev branch:
export QDRANT_VERSION=ghcr/dev
bash -x tools/run_ci.sh
declare -A DATASET_TO_ENGINE
DATASET_TO_ENGINE["laion-small-clip"]="qdrant-continuous-benchmark"
DATASET_TO_ENGINE["msmarco-sparse-1M"]="qdrant-sparse-vector"

# Benchmark the master branch:
export QDRANT_VERSION=docker/master
bash -x tools/run_ci.sh
for dataset in "${!DATASET_TO_ENGINE[@]}"; do
export ENGINE_NAME=${DATASET_TO_ENGINE[$dataset]}
export DATASETS=$dataset

# Benchmark the dev branch:
export QDRANT_VERSION=ghcr/dev
bash -x tools/run_ci.sh

# Benchmark the master branch:
export QDRANT_VERSION=docker/master
bash -x tools/run_ci.sh
done
7 changes: 7 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@ repos:
- id: isort
name: "Sort Imports"
args: ["--profile", "black"]

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.3.5
hooks:
# Run the linter.
- id: ruff
args: [ --fix ]
14 changes: 11 additions & 3 deletions benchmark/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,28 @@
from dataset_reader.ann_h5_reader import AnnH5Reader
from dataset_reader.base_reader import BaseReader
from dataset_reader.json_reader import JSONReader
from dataset_reader.sparse_reader import SparseReader


@dataclass
class DatasetConfig:
vector_size: int
distance: str
name: str
type: str
path: str

link: Optional[str] = None
schema: Optional[Dict[str, str]] = field(default_factory=dict)
# None in case of sparse vectors:
vector_size: Optional[int] = None
distance: Optional[str] = None


READER_TYPE = {"h5": AnnH5Reader, "jsonl": JSONReader, "tar": AnnCompoundReader}
READER_TYPE = {
"h5": AnnH5Reader,
"jsonl": JSONReader,
"tar": AnnCompoundReader,
"sparse": SparseReader,
}


class Dataset:
Expand Down
1 change: 1 addition & 0 deletions dataset_reader/ann_compound_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def read_queries(self) -> Iterator[Query]:
vector /= np.linalg.norm(vector)
yield Query(
vector=vector.tolist(),
sparse_vector=None,
meta_conditions=row_json["conditions"],
expected_result=row_json["closest_ids"],
expected_scores=row_json["closest_scores"],
Expand Down
5 changes: 4 additions & 1 deletion dataset_reader/ann_h5_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def read_queries(self) -> Iterator[Query]:
vector /= np.linalg.norm(vector)
yield Query(
vector=vector.tolist(),
sparse_vector=None,
meta_conditions=None,
expected_result=expected_result.tolist(),
expected_scores=expected_scores.tolist(),
Expand All @@ -33,7 +34,9 @@ def read_data(self) -> Iterator[Record]:
for idx, vector in enumerate(data["train"]):
if self.normalize:
vector /= np.linalg.norm(vector)
yield Record(id=idx, vector=vector.tolist(), metadata=None)
yield Record(
id=idx, vector=vector.tolist(), sparse_vector=None, metadata=None
)


if __name__ == "__main__":
Expand Down
12 changes: 10 additions & 2 deletions dataset_reader/base_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,24 @@
from typing import Iterator, List, Optional


@dataclass
class SparseVector:
indices: List[int]
values: List[float]


@dataclass
class Record:
id: int
vector: List[float]
vector: Optional[List[float]]
sparse_vector: Optional[SparseVector]
metadata: Optional[dict]


@dataclass
class Query:
vector: List[float]
vector: Optional[List[float]]
sparse_vector: Optional[SparseVector]
meta_conditions: Optional[dict]
expected_result: Optional[List[int]]
expected_scores: Optional[List[float]] = None
Expand Down
9 changes: 7 additions & 2 deletions dataset_reader/json_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,18 @@ def read_queries(self) -> Iterator[Query]:
):
# ToDo: add meta_conditions

yield Query(vector=vector, meta_conditions=None, expected_result=neighbours)
yield Query(
vector=vector,
sparse_vector=None,
meta_conditions=None,
expected_result=neighbours,
)

def read_data(self) -> Iterator[Record]:
for idx, (vector, payload) in enumerate(
zip(self.read_vectors(), self.read_payloads())
):
yield Record(id=idx, vector=vector, metadata=payload)
yield Record(id=idx, vector=vector, sparse_vector=None, metadata=payload)


if __name__ == "__main__":
Expand Down
100 changes: 100 additions & 0 deletions dataset_reader/sparse_reader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import os
from pathlib import Path
from typing import Iterator, List, Tuple, Union

import numpy as np

from dataset_reader.base_reader import BaseReader, Query, Record, SparseVector


def read_sparse_matrix_fields(
filename: Union[Path, str]
) -> Tuple[np.array, np.array, np.array]:
"""Read the fields of a CSR matrix without instantiating it"""

with open(filename, "rb") as f:
sizes = np.fromfile(f, dtype="int64", count=3)
n_row, n_col, n_non_zero = sizes
index_pointer = np.fromfile(f, dtype="int64", count=n_row + 1)
assert n_non_zero == index_pointer[-1]
columns = np.fromfile(f, dtype="int32", count=n_non_zero)
assert np.all(columns >= 0) and np.all(columns < n_col)
values = np.fromfile(f, dtype="float32", count=n_non_zero)
return values, columns, index_pointer


def csr_to_sparse_vectors(
values: List[float], columns: List[int], index_pointer: List[int]
) -> Iterator[SparseVector]:
num_rows = len(index_pointer) - 1

for i in range(num_rows):
start = index_pointer[i]
end = index_pointer[i + 1]
row_values, row_indices = [], []
for j in range(start, end):
row_values.append(values[j])
row_indices.append(columns[j])
yield SparseVector(indices=row_indices, values=row_values)


def read_csr_matrix(filename: Union[Path, str]) -> Iterator[SparseVector]:
"""Read a CSR matrix in spmat format"""
values, columns, index_pointer = read_sparse_matrix_fields(filename)
values = values.tolist()
columns = columns.tolist()
index_pointer = index_pointer.tolist()

yield from csr_to_sparse_vectors(values, columns, index_pointer)


def knn_result_read(
filename: Union[Path, str]
) -> Tuple[List[List[int]], List[List[float]]]:
n, d = map(int, np.fromfile(filename, dtype="uint32", count=2))
assert os.stat(filename).st_size == 8 + n * d * (4 + 4)
with open(filename, "rb") as f:
f.seek(4 + 4)
ids = np.fromfile(f, dtype="int32", count=n * d).reshape(n, d).tolist()
scores = np.fromfile(f, dtype="float32", count=n * d).reshape(n, d).tolist()
return ids, scores


class SparseReader(BaseReader):
def __init__(self, path, normalize=False):
self.path = path
self.normalize = normalize

def read_queries(self) -> Iterator[Query]:
queries_path = self.path / "queries.csr"
X = read_csr_matrix(queries_path)

gt_path = self.path / "results.gt"
gt_indices, _ = knn_result_read(gt_path)

for i, sparse_vector in enumerate(X):
yield Query(
vector=None,
sparse_vector=sparse_vector,
meta_conditions=None,
expected_result=gt_indices[i],
)

def read_data(self) -> Iterator[Record]:
data_path = self.path / "data.csr"
X = read_csr_matrix(data_path)

for i, sparse_vector in enumerate(X):
yield Record(id=i, vector=None, sparse_vector=sparse_vector, metadata=None)


if __name__ == "__main__":
vals = [1, 3, 2, 3, 6, 4, 5]
cols = [0, 2, 2, 1, 3, 0, 2]
pointers = [0, 2, 3, 5, 7]
vecs = [vec for vec in csr_to_sparse_vectors(vals, cols, pointers)]

assert vecs[0] == SparseVector(indices=[0, 2], values=[1, 3])
assert vecs[1] == SparseVector(indices=[2], values=[2])
assert vecs[2] == SparseVector(indices=[1, 3], values=[3, 6])
assert vecs[3] == SparseVector(indices=[0, 2], values=[4, 5])
12 changes: 12 additions & 0 deletions datasets/datasets.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,18 @@
"path": "dbpedia-openai-1M-1536-angular/dbpedia_openai_1M",
"link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/dbpedia_openai_1M.tgz"
},
{
"name": "msmarco-sparse-100K",
"type": "sparse",
"path": "msmarco-sparse/100K",
"link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/msmacro-sparse-100K.tar.gz"
},
{
"name": "msmarco-sparse-1M",
"type": "sparse",
"path": "msmarco-sparse/1M",
"link": "https://storage.googleapis.com/ann-filtered-benchmark/datasets/msmacro-sparse-1M.tar.gz"
},
{
"name": "h-and-m-2048-angular-filters",
"vector_size": 2048,
Expand Down
9 changes: 9 additions & 0 deletions engine/base_client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,12 @@

class IncompatibilityError(Exception):
pass


__all__ = [
"BaseClient",
"BaseConfigurator",
"BaseSearcher",
"BaseUploader",
"IncompatibilityError",
]
5 changes: 4 additions & 1 deletion engine/base_client/client.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import os
from datetime import datetime
from pathlib import Path
from typing import List

from benchmark import ROOT_DIR
Expand Down Expand Up @@ -31,6 +30,10 @@ def __init__(
self.searchers = searchers
self.engine = engine

@property
def sparse_vector_support(self):
return self.configurator.SPARSE_VECTOR_SUPPORT

def save_search_results(
self, dataset_name: str, results: dict, search_id: int, search_params: dict
):
Expand Down
1 change: 1 addition & 0 deletions engine/base_client/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@


class BaseConfigurator:
SPARSE_VECTOR_SUPPORT: bool = False
DISTANCE_MAPPING = {}

def __init__(self, host, collection_params: dict, connection_params: dict):
Expand Down
8 changes: 3 additions & 5 deletions engine/base_client/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ def get_mp_start_method(cls):
return None

@classmethod
def search_one(
cls, vector: List[float], meta_conditions, top: Optional[int]
) -> List[Tuple[int, float]]:
def search_one(cls, query: Query, top: Optional[int]) -> List[Tuple[int, float]]:
raise NotImplementedError()

@classmethod
def _search_one(cls, query, top: Optional[int] = None):
def _search_one(cls, query: Query, top: Optional[int] = None):
if top is None:
top = (
len(query.expected_result)
Expand All @@ -45,7 +43,7 @@ def _search_one(cls, query, top: Optional[int] = None):
)

start = time.perf_counter()
search_res = cls.search_one(query.vector, query.meta_conditions, top)
search_res = cls.search_one(query, top)
end = time.perf_counter()

precision = 1.0
Expand Down
13 changes: 4 additions & 9 deletions engine/base_client/upload.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import time
from multiprocessing import get_context
from typing import Iterable, List, Optional, Tuple
from typing import Iterable, List

import tqdm

Expand Down Expand Up @@ -80,22 +80,17 @@ def upload(
}

@classmethod
def _upload_batch(
cls, batch: Tuple[List[int], List[list], List[Optional[dict]]]
) -> float:
ids, vectors, metadata = batch
def _upload_batch(cls, batch: List[Record]) -> float:
start = time.perf_counter()
cls.upload_batch(ids, vectors, metadata)
cls.upload_batch(batch)
return time.perf_counter() - start

@classmethod
def post_upload(cls, distance):
return {}

@classmethod
def upload_batch(
cls, ids: List[int], vectors: List[list], metadata: List[Optional[dict]]
):
def upload_batch(cls, batch: List[Record]):
raise NotImplementedError()

@classmethod
Expand Down
Loading