Skip to content

Commit d3755ec

Browse files
authored
Merge pull request #40 from mpdimitr/mixed-workload
Mixed workload
2 parents 80f7f5f + dd663f4 commit d3755ec

File tree

2 files changed

+48
-6
lines changed

2 files changed

+48
-6
lines changed

engine/base_client/search.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121

2222
class BaseSearcher:
23-
_doc_id_counter = itertools.count(1000000)
23+
_doc_id_counter = itertools.count(1000000000) # Start from 1 billion to avoid conflicts
2424
MP_CONTEXT = None
2525

2626
def __init__(self, host, connection_params, search_params):
@@ -73,6 +73,9 @@ def _insert_one(cls, query):
7373

7474
# Generate unique doc_id here
7575
doc_id = next(cls._doc_id_counter)
76+
77+
# Debug logging to verify inserts are happening
78+
#print(f"DEBUG: Inserting vector with doc_id={doc_id}")
7679

7780
cls.insert_one(str(doc_id), query.vector, query.meta_conditions)
7881
end = time.perf_counter()
@@ -266,10 +269,19 @@ def worker_function(self, distance, search_one, insert_one, chunk, result_queue,
266269

267270
def process_chunk(chunk, search_one, insert_one, insert_fraction):
268271
results = []
272+
insert_count = 0
273+
search_count = 0
274+
275+
#print(f"DEBUG: Processing chunk of {len(chunk)} queries with insert_fraction={insert_fraction}")
276+
269277
for i, query in enumerate(chunk):
270278
if random.random() < insert_fraction:
271279
result = insert_one(query)
280+
insert_count += 1
272281
else:
273282
result = search_one(query)
283+
search_count += 1
274284
results.append(result)
285+
286+
#print(f"DEBUG: Chunk complete - {search_count} searches, {insert_count} inserts")
275287
return results

engine/clients/redis/search.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,39 @@ def insert_one(cls, doc_id: int, vector, meta_conditions):
112112
else:
113113
vec_param = vector
114114

115-
doc = {"vector": vec_param}
116-
if meta_conditions:
117-
for k, v in meta_conditions.items():
118-
doc[k] = str(v)
115+
# Process metadata exactly like upload_batch does
116+
meta = meta_conditions if meta_conditions else {}
117+
geopoints = {}
118+
payload = {}
119+
120+
if meta is not None:
121+
for k, v in meta.items():
122+
# This is a patch for arxiv-titles dataset where we have a list of "labels", and
123+
# we want to index all of them under the same TAG field (whose separator is ';').
124+
if k == "labels":
125+
payload[k] = ";".join(v)
126+
if (
127+
v is not None
128+
and not isinstance(v, dict)
129+
and not isinstance(v, list)
130+
):
131+
payload[k] = v
132+
# Redis treats geopoints differently and requires putting them as
133+
# a comma-separated string with lat and lon coordinates
134+
from engine.clients.redis.helper import convert_to_redis_coords
135+
geopoints = {
136+
k: ",".join(map(str, convert_to_redis_coords(v["lon"], v["lat"])))
137+
for k, v in meta.items()
138+
if isinstance(v, dict)
139+
}
119140

120-
cls.client.hset(str(doc_id), mapping=doc)
141+
#print(f"DEBUG: Redis inserting doc_id={doc_id}, vector_size={len(vec_param)} bytes")
142+
cls.client.hset(
143+
str(doc_id),
144+
mapping={
145+
"vector": vec_param,
146+
**payload,
147+
**geopoints,
148+
},
149+
)
150+
#print(f"DEBUG: Redis insert complete for doc_id={doc_id}")

0 commit comments

Comments
 (0)