Skip to content

Commit 20d0245

Browse files
committed
Persist AckQueue for indexing
1 parent bae27e9 commit 20d0245

File tree

11 files changed

+102
-83
lines changed

11 files changed

+102
-83
lines changed

app/data_sources/bookstack.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from data_source_api.base_data_source import BaseDataSource, ConfigField, HTMLInputType
77
from data_source_api.exception import InvalidDataSourceConfig
88
from data_source_api.utils import parse_with_workers
9-
from indexing_queue import IndexingQueue
9+
from index_queue import IndexQueue
1010
from parsers.html import html_to_text
1111
from pydantic import BaseModel
1212
from requests import Session, HTTPError
@@ -194,10 +194,10 @@ def _parse_documents_worker(self, raw_docs: List[Dict]):
194194
type=DocumentType.DOCUMENT))
195195
if len(parsed_docs) >= 50:
196196
total_fed += len(parsed_docs)
197-
IndexingQueue.get().feed(docs=parsed_docs)
197+
IndexQueue.get_instance().put(docs=parsed_docs)
198198
parsed_docs = []
199199

200-
IndexingQueue.get().feed(docs=parsed_docs)
200+
IndexQueue.get_instance().put(docs=parsed_docs)
201201
total_fed += len(parsed_docs)
202202
if total_fed > 0:
203203
logging.info(f"Worker fed {total_fed} documents")

app/data_sources/confluence.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from data_source_api.base_data_source import BaseDataSource, ConfigField, HTMLInputType
99
from data_source_api.exception import InvalidDataSourceConfig
1010
from data_source_api.utils import parse_with_workers
11-
from indexing_queue import IndexingQueue
11+
from index_queue import IndexQueue
1212
from parsers.html import html_to_text
1313
from pydantic import BaseModel
1414

@@ -115,10 +115,10 @@ def _parse_documents_worker(self, raw_docs: List[Dict]):
115115
type=DocumentType.DOCUMENT))
116116
if len(parsed_docs) >= 50:
117117
total_fed += len(parsed_docs)
118-
IndexingQueue.get().feed(docs=parsed_docs)
118+
IndexQueue.get_instance().put(docs=parsed_docs)
119119
parsed_docs = []
120120

121-
IndexingQueue.get().feed(docs=parsed_docs)
121+
IndexQueue.get_instance().put(docs=parsed_docs)
122122
total_fed += len(parsed_docs)
123123
if total_fed > 0:
124124
logging.info(f'Worker fed {total_fed} documents')

app/data_sources/google_drive.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from data_source_api.base_data_source import BaseDataSource, ConfigField, HTMLInputType
1818
from data_source_api.basic_document import BasicDocument, DocumentType, FileType
1919
from data_source_api.exception import InvalidDataSourceConfig, KnownException
20-
from indexing_queue import IndexingQueue
20+
from index_queue import IndexQueue
2121
from parsers.html import html_to_text
2222
from parsers.pptx import pptx_to_text
2323
from parsers.docx import docx_to_html
@@ -188,7 +188,7 @@ def _index_files_from_drive(self, drive) -> List[dict]:
188188
file_type=FileType.from_mime_type(mime_type=file['mimeType'])
189189
))
190190

191-
IndexingQueue.get().feed(documents)
191+
IndexQueue.get_instance().put(documents)
192192

193193
def _get_all_drives(self) -> List[dict]:
194194
return [{'name': 'My Drive', 'id': None}] \

app/data_sources/mattermost.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from data_source_api.basic_document import BasicDocument, DocumentType
1212
from data_source_api.exception import InvalidDataSourceConfig
1313
from data_source_api.utils import parse_with_workers
14-
from indexing_queue import IndexingQueue
14+
from index_queue import IndexQueue
1515

1616
logger = logging.getLogger(__name__)
1717

@@ -150,7 +150,7 @@ def _feed_channel(self, channel: MattermostChannel):
150150
parsed_posts.append(last_message)
151151
if len(parsed_posts) >= MattermostDataSource.FEED_BATCH_SIZE:
152152
total_fed += len(parsed_posts)
153-
IndexingQueue.get().feed(docs=parsed_posts)
153+
IndexQueue.get_instance().put(docs=parsed_posts)
154154
parsed_posts = []
155155

156156
author_image_url = f"{self._get_mattermost_url()}/api/v4/users/{post['user_id']}/image?_=0"
@@ -175,7 +175,7 @@ def _feed_channel(self, channel: MattermostChannel):
175175
break
176176
page += 1
177177

178-
IndexingQueue.get().feed(docs=parsed_posts)
178+
IndexQueue.get_instance().put(docs=parsed_posts)
179179
total_fed += len(parsed_posts)
180180

181181
if len(parsed_posts) > 0:

app/data_sources/rocketchat.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from data_source_api.base_data_source import BaseDataSource, ConfigField, HTMLInputType
1010
from data_source_api.basic_document import DocumentType, BasicDocument
1111
from data_source_api.exception import InvalidDataSourceConfig
12-
from indexing_queue import IndexingQueue
12+
from index_queue import IndexQueue
1313

1414

1515
@dataclass
@@ -191,7 +191,7 @@ def _feed_new_documents(self) -> None:
191191
documents.append(last_msg)
192192

193193
logging.info(f"Total messages : {len(documents)}")
194-
IndexingQueue.get().feed(docs=documents)
194+
IndexQueue.get_instance().put(docs=documents)
195195

196196

197197
if __name__ == "__main__":

app/data_sources/slack.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from data_source_api.base_data_source import BaseDataSource, ConfigField, HTMLInputType
1111
from data_source_api.basic_document import DocumentType, BasicDocument
1212
from data_source_api.utils import parse_with_workers
13-
from indexing_queue import IndexingQueue
13+
from index_queue import IndexQueue
1414

1515
logger = logging.getLogger(__name__)
1616

@@ -125,7 +125,7 @@ def _feed_conversation(self, conv):
125125
documents.append(last_msg)
126126
if len(documents) == SlackDataSource.FEED_BATCH_SIZE:
127127
total_fed += SlackDataSource.FEED_BATCH_SIZE
128-
IndexingQueue.get().feed(docs=documents)
128+
IndexQueue.get_instance().put(docs=documents)
129129
documents = []
130130

131131
timestamp = message['ts']
@@ -141,7 +141,7 @@ def _feed_conversation(self, conv):
141141
if last_msg is not None:
142142
documents.append(last_msg)
143143

144-
IndexingQueue.get().feed(docs=documents)
144+
IndexQueue.get_instance().put(docs=documents)
145145
total_fed += len(documents)
146146
if total_fed > 0:
147147
logger.info(f'Slack worker fed {total_fed} documents')
@@ -151,7 +151,7 @@ def _fetch_conversation_messages(self, conv):
151151
cursor = None
152152
has_more = True
153153
last_index_unix = self._last_index_time.timestamp()
154-
logger.info(f'Fetching messages for conversation {conv.name} since {last_index_unix}')
154+
logger.info(f'Fetching messages for conversation {conv.name}')
155155

156156
while has_more:
157157
response = self._slack.conversations_history(channel=conv.id, oldest=str(last_index_unix),

app/index_queue.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import threading
2+
from dataclasses import dataclass
3+
from typing import List
4+
5+
from persistqueue import SQLiteAckQueue
6+
7+
from data_source_api.basic_document import BasicDocument
8+
from paths import SQLITE_TASKS_PATH
9+
10+
11+
@dataclass
12+
class IndexQueueItem:
13+
queue_item_id: int
14+
doc: BasicDocument
15+
16+
17+
class IndexQueue(SQLiteAckQueue):
18+
__instance = None
19+
__lock = threading.Lock()
20+
21+
@classmethod
22+
def get_instance(cls):
23+
with cls.__lock:
24+
if cls.__instance is None:
25+
cls.__instance = cls()
26+
return cls.__instance
27+
28+
def __init__(self):
29+
if IndexQueue.__instance is not None:
30+
raise RuntimeError("Queue is a singleton, use .get() to get the instance")
31+
32+
self.condition = threading.Condition()
33+
super().__init__(path=SQLITE_TASKS_PATH, multithreading=True, name="index")
34+
35+
def put_single(self, doc: BasicDocument):
36+
self.put([doc])
37+
38+
def put(self, docs: List[BasicDocument]):
39+
with self.condition:
40+
for doc in docs:
41+
super().put(doc)
42+
43+
self.condition.notify_all()
44+
45+
def consume_all(self, max_docs=5000, timeout=1) -> List[IndexQueueItem]:
46+
with self.condition:
47+
self.condition.wait(timeout=timeout)
48+
49+
queue_items = []
50+
count = 0
51+
while not super().empty() and count < max_docs:
52+
raw_items = super().get(raw=True)
53+
queue_items.append(IndexQueueItem(queue_item_id=raw_items['pqid'], doc=raw_items['data']))
54+
count += 1
55+
56+
return queue_items

app/indexing/background_indexer.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
11
import logging
22
import threading
3+
from typing import List
34

4-
from indexing_queue import IndexingQueue
5+
from index_queue import IndexQueue
56
from indexing.index_documents import Indexer
67

78

9+
logger = logging.getLogger()
10+
11+
812
class BackgroundIndexer:
913
_thread = None
1014
_stop_event = threading.Event()
@@ -31,17 +35,26 @@ def stop(cls):
3135

3236
@staticmethod
3337
def run():
34-
logger = logging.getLogger()
35-
docs_queue_instance = IndexingQueue.get()
38+
docs_queue_instance = IndexQueue.get_instance()
3639
logger.info(f'Background indexer started...')
3740

3841
while not BackgroundIndexer._stop_event.is_set():
39-
docs_chunk = docs_queue_instance.consume_all()
40-
if not docs_chunk:
42+
queue_items = docs_queue_instance.consume_all()
43+
if not queue_items:
4144
continue
4245

43-
BackgroundIndexer._currently_indexing_count = len(docs_chunk)
44-
logger.info(f'Got chunk of {len(docs_chunk)} documents')
45-
Indexer.index_documents(docs_chunk)
46-
logger.info(f'Finished indexing chunk of {len(docs_chunk)} documents')
47-
BackgroundIndexer._currently_indexing_count = 0
46+
BackgroundIndexer._currently_indexing_count = len(queue_items)
47+
logger.info(f'Got chunk of {len(queue_items)} documents')
48+
49+
docs = [doc.doc for doc in queue_items]
50+
Indexer.index_documents(docs)
51+
BackgroundIndexer._ack_chunk(docs_queue_instance, [doc.queue_item_id for doc in queue_items])
52+
53+
@staticmethod
54+
def _ack_chunk(queue: IndexQueue, ids: List[int]):
55+
logger.info(f'Finished indexing chunk of {len(ids)} documents')
56+
for item_id in ids:
57+
queue.ack(id=item_id)
58+
59+
logger.info(f'Acked {len(ids)} documents.')
60+
BackgroundIndexer._currently_indexing_count = 0

app/indexing_queue.py

Lines changed: 0 additions & 51 deletions
This file was deleted.

app/main.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from indexing.background_indexer import BackgroundIndexer
2121
from indexing.bm25_index import Bm25Index
2222
from indexing.faiss_index import FaissIndex
23-
from indexing_queue import IndexingQueue
23+
from index_queue import IndexQueue
2424
from paths import UI_PATH
2525
from schemas import DataSource
2626
from schemas.data_source_type import DataSourceType
@@ -138,7 +138,7 @@ class Status:
138138
docs_left_to_index: int
139139

140140
return Status(docs_in_indexing=BackgroundIndexer.get_currently_indexing(),
141-
docs_left_to_index=IndexingQueue.get().get_how_many_left())
141+
docs_left_to_index=IndexQueue.get_instance().qsize())
142142

143143

144144
@app.post("/clear-index")
@@ -157,6 +157,6 @@ async def clear_index():
157157
logger.warning(f"Failed to mount UI (you probably need to build it): {e}")
158158

159159

160-
# if __name__ == '__main__':
161-
# import uvicorn
162-
# uvicorn.run("main:app", host="localhost", port=8000)
160+
if __name__ == '__main__':
161+
import uvicorn
162+
uvicorn.run("main:app", host="localhost", port=8000)

0 commit comments

Comments
 (0)