From 19e727d05de6cff783bfd6fc3ce98f187fe25819 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Tue, 20 Sep 2022 08:31:11 +0200 Subject: [PATCH 01/27] Replace deleted elements at insertion --- hnswlib/hnswalg.h | 60 +++++++- python_bindings/bindings.cpp | 120 ++++++++++++++- .../tests/bindings_test_replace.py | 143 ++++++++++++++++++ 3 files changed, 312 insertions(+), 11 deletions(-) create mode 100644 python_bindings/tests/bindings_test_replace.py diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 32b173e1..5aa4e95f 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -66,6 +66,11 @@ class HierarchicalNSW : public AlgorithmInterface { mutable std::atomic metric_distance_computations{0}; mutable std::atomic metric_hops{0}; + bool replace_deleted = false; + + std::mutex deleted_elements_lock; + std::unordered_set deleted_elements; + HierarchicalNSW(SpaceInterface *s) { } @@ -75,7 +80,9 @@ class HierarchicalNSW : public AlgorithmInterface { SpaceInterface *s, const std::string &location, bool nmslib = false, - size_t max_elements = 0) { + size_t max_elements = 0, + bool replace_deleted = false) + : replace_deleted(replace_deleted) { loadIndex(location, s, max_elements); } @@ -85,10 +92,12 @@ class HierarchicalNSW : public AlgorithmInterface { size_t max_elements, size_t M = 16, size_t ef_construction = 200, - size_t random_seed = 100) + size_t random_seed = 100, + bool replace_deleted = false) : link_list_locks_(max_elements), link_list_update_locks_(max_update_element_locks), - element_levels_(max_elements) { + element_levels_(max_elements), + replace_deleted(replace_deleted) { max_elements_ = max_elements; num_deleted_ = 0; data_size_ = s->get_data_size(); @@ -694,6 +703,7 @@ class HierarchicalNSW : public AlgorithmInterface { for (size_t i = 0; i < cur_element_count; i++) { if (isMarkedDeleted(i)) { num_deleted_ += 1; + if (replace_deleted) deleted_elements.insert(i); } } @@ -747,6 +757,7 @@ class HierarchicalNSW : public AlgorithmInterface { unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId))+2; *ll_cur |= DELETE_MARK; num_deleted_ += 1; + if (replace_deleted) deleted_elements.insert(internalId); } else { throw std::runtime_error("The requested to delete element is already deleted"); } @@ -775,6 +786,7 @@ class HierarchicalNSW : public AlgorithmInterface { unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId)) + 2; *ll_cur &= ~DELETE_MARK; num_deleted_ -= 1; + if (replace_deleted) deleted_elements.erase(internalId); } else { throw std::runtime_error("The requested to undelete element is not deleted"); } @@ -800,6 +812,48 @@ class HierarchicalNSW : public AlgorithmInterface { } + /** + * Adds point and replaces previously deleted point if any, updating it with new point + * + * If deleted point was replaced returns its label, else returns label of added point + */ + labeltype insertPoint(const void* data_point, labeltype label) { + if (!replace_deleted) { + throw std::runtime_error("Don't use insertPoint when replacement of deleted elements is disabled"); + } + + std::unique_lock tmp_del_el_lock(deleted_elements_lock); + bool is_empty = deleted_elements.empty(); + tmp_del_el_lock.unlock(); + + if (is_empty) { + addPoint(data_point, label); + return label; + } + else { + tmp_del_el_lock.lock(); + tableint id_replace = *deleted_elements.begin(); + deleted_elements.erase(id_replace); + tmp_del_el_lock.unlock(); + + // use link list locks to not block calls for other elements + std::unique_lock lock_label_update(link_list_update_locks_[(id_replace & (max_update_element_locks - 1))]); + labeltype label_replace = getExternalLabel(id_replace); + setExternalLabel(id_replace, label); + lock_label_update.unlock(); + + std::unique_lock tmp_label_lookup_lock(label_lookup_lock); + label_lookup_.erase(label_replace); + label_lookup_[label] = id_replace; + tmp_label_lookup_lock.unlock(); + + addPoint(data_point, label); + + return label_replace; + } + } + + /** * Adds point. Updates the point if it is already in the index */ diff --git a/python_bindings/bindings.cpp b/python_bindings/bindings.cpp index 85751c0b..668b7a95 100644 --- a/python_bindings/bindings.cpp +++ b/python_bindings/bindings.cpp @@ -178,12 +178,13 @@ class Index { size_t maxElements, size_t M, size_t efConstruction, - size_t random_seed) { + size_t random_seed, + bool replace_deleted) { if (appr_alg) { throw std::runtime_error("The index is already initiated."); } cur_l = 0; - appr_alg = new hnswlib::HierarchicalNSW(l2space, maxElements, M, efConstruction, random_seed); + appr_alg = new hnswlib::HierarchicalNSW(l2space, maxElements, M, efConstruction, random_seed, replace_deleted); index_inited = true; ep_added = false; appr_alg->ef_ = default_ef; @@ -208,12 +209,12 @@ class Index { } - void loadIndex(const std::string &path_to_index, size_t max_elements) { + void loadIndex(const std::string &path_to_index, size_t max_elements, bool replace_deleted) { if (appr_alg) { std::cerr << "Warning: Calling load_index for an already inited index. Old index is being deallocated." << std::endl; delete appr_alg; } - appr_alg = new hnswlib::HierarchicalNSW(l2space, path_to_index, false, max_elements); + appr_alg = new hnswlib::HierarchicalNSW(l2space, path_to_index, false, max_elements, replace_deleted); cur_l = appr_alg->cur_element_count; index_inited = true; } @@ -229,6 +230,78 @@ class Index { } + py::object insertItems_return_numpy(py::object input, py::object ids_ = py::none(), int num_threads = -1) { + size_t rows, features; + hnswlib::labeltype* data_numpy_l = NULL; + + py::array_t < dist_t, py::array::c_style | py::array::forcecast > items(input); + auto buffer = items.request(); + if (num_threads <= 0) + num_threads = num_threads_default; + get_input_array_shapes(buffer, &rows, &features); + if (features != dim) + throw std::runtime_error("wrong dimensionality of the vectors"); + + // avoid using threads when the number of insertions is small: + if (rows <= num_threads * 4) { + num_threads = 1; + } + + std::vector ids = get_input_ids_and_check_shapes(ids_, rows); + + { + int start = 0; + data_numpy_l = new hnswlib::labeltype[rows]; + + if (!ep_added) { + size_t id = ids.size() ? ids.at(0) : (cur_l); + float* vector_data = (float*)items.data(0); + std::vector norm_array(dim); + if (normalize) { + normalize_vector(vector_data, norm_array.data()); + vector_data = norm_array.data(); + } + hnswlib::labeltype label = appr_alg->insertPoint((void*)vector_data, (size_t)id); + data_numpy_l[start] = label; + start = 1; + ep_added = true; + } + + py::gil_scoped_release l; + if (normalize == false) { + ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) { + size_t id = ids.size() ? ids.at(row) : (cur_l + row); + hnswlib::labeltype label = appr_alg->insertPoint((void*)items.data(row), (size_t)id); + data_numpy_l[row] = label; + }); + } + else { + std::vector norm_array(num_threads * dim); + ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) { + // normalize vector: + size_t start_idx = threadId * dim; + normalize_vector((float*)items.data(row), (norm_array.data() + start_idx)); + + size_t id = ids.size() ? ids.at(row) : (cur_l + row); + hnswlib::labeltype label = appr_alg->insertPoint((void*)(norm_array.data() + start_idx), (size_t)id); + data_numpy_l[row] = label; + }); + } + cur_l += rows; + } + + py::capsule free_when_done_l(data_numpy_l, [](void* f) { + delete[] f; + }); + + return py::array_t( + { rows }, // shape + { sizeof(hnswlib::labeltype) }, // C-style contiguous strides for each index + data_numpy_l, // the data pointer + free_when_done_l); + } + + void addItems(py::object input, py::object ids_ = py::none(), int num_threads = -1) { py::array_t < dist_t, py::array::c_style | py::array::forcecast > items(input); auto buffer = items.request(); @@ -399,6 +472,7 @@ class Index { "ef"_a = appr_alg->ef_, "has_deletions"_a = (bool)appr_alg->num_deleted_, "size_links_per_element"_a = appr_alg->size_links_per_element_, + "replace_deleted"_a = appr_alg->replace_deleted, "label_lookup_external"_a = py::array_t( { appr_alg->label_lookup_.size() }, // shape @@ -561,12 +635,19 @@ class Index { } // process deleted elements + bool replace_deleted = false; + if (d.contains("replace_deleted")) { + replace_deleted = d["replace_deleted"].cast(); + } + appr_alg->replace_deleted = replace_deleted; + appr_alg->num_deleted_ = 0; bool has_deletions = d["has_deletions"].cast(); if (has_deletions) { for (size_t i = 0; i < appr_alg->cur_element_count; i++) { if (appr_alg->isMarkedDeleted(i)) { appr_alg->num_deleted_ += 1; + if (replace_deleted) appr_alg->deleted_elements.insert(i); } } } @@ -843,15 +924,38 @@ PYBIND11_PLUGIN(hnswlib) { /* WARNING: Index::createFromIndex is not thread-safe with Index::addItems */ .def(py::init(&Index::createFromIndex), py::arg("index")) .def(py::init(), py::arg("space"), py::arg("dim")) - .def("init_index", &Index::init_new_index, py::arg("max_elements"), py::arg("M") = 16, py::arg("ef_construction") = 200, py::arg("random_seed") = 100) - .def("knn_query", &Index::knnQuery_return_numpy, py::arg("data"), py::arg("k") = 1, py::arg("num_threads") = -1) - .def("add_items", &Index::addItems, py::arg("data"), py::arg("ids") = py::none(), py::arg("num_threads") = -1) + .def("init_index", + &Index::init_new_index, + py::arg("max_elements"), + py::arg("M") = 16, + py::arg("ef_construction") = 200, + py::arg("random_seed") = 100, + py::arg("replace_deleted") = false) + .def("knn_query", + &Index::knnQuery_return_numpy, + py::arg("data"), + py::arg("k") = 1, + py::arg("num_threads") = -1) + .def("add_items", + &Index::addItems, + py::arg("data"), + py::arg("ids") = py::none(), + py::arg("num_threads") = -1) + .def("insert_items", + &Index::insertItems_return_numpy, + py::arg("data"), + py::arg("ids") = py::none(), + py::arg("num_threads") = -1) .def("get_items", &Index::getDataReturnList, py::arg("ids") = py::none()) .def("get_ids_list", &Index::getIdsList) .def("set_ef", &Index::set_ef, py::arg("ef")) .def("set_num_threads", &Index::set_num_threads, py::arg("num_threads")) .def("save_index", &Index::saveIndex, py::arg("path_to_index")) - .def("load_index", &Index::loadIndex, py::arg("path_to_index"), py::arg("max_elements") = 0) + .def("load_index", + &Index::loadIndex, + py::arg("path_to_index"), + py::arg("max_elements") = 0, + py::arg("replace_deleted") = false) .def("mark_deleted", &Index::markDeleted, py::arg("label")) .def("unmark_deleted", &Index::unmarkDeleted, py::arg("label")) .def("resize_index", &Index::resizeIndex, py::arg("new_size")) diff --git a/python_bindings/tests/bindings_test_replace.py b/python_bindings/tests/bindings_test_replace.py new file mode 100644 index 00000000..366e80f2 --- /dev/null +++ b/python_bindings/tests/bindings_test_replace.py @@ -0,0 +1,143 @@ +import os +import pickle +import unittest + +import numpy as np + +import hnswlib + + +class RandomSelfTestCase(unittest.TestCase): + def testRandomSelf(self): + dim = 16 + num_elements = 5000 + max_num_elements = 2 * num_elements + + # Generating sample data + print("Generating data") + # batch 1 + first_id = 0 + last_id = num_elements + labels1 = np.arange(first_id, last_id) + data1 = np.float32(np.random.random((num_elements, dim))) + # batch 2 + first_id += num_elements + last_id += num_elements + labels2 = np.arange(first_id, last_id) + data2 = np.float32(np.random.random((num_elements, dim))) + # batch 3 + first_id += num_elements + last_id += num_elements + labels3 = np.arange(first_id, last_id) + data3 = np.float32(np.random.random((num_elements, dim))) + # batch 4 + first_id += num_elements + last_id += num_elements + labels4 = np.arange(first_id, last_id) + data4 = np.float32(np.random.random((num_elements, dim))) + + # Declaring index + hnsw_index = hnswlib.Index(space='l2', dim=dim) + hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, replace_deleted=True) + + hnsw_index.set_ef(100) + hnsw_index.set_num_threads(4) + + # Add batch 1 and 2 + print("Adding batch 1") + hnsw_index.add_items(data1, labels1) + print("Adding batch 2") + hnsw_index.add_items(data2, labels2) # maximum number of elements is reached + + # Delete nearest neighbors of batch 2 + print("Deleting neighbors of batch 2") + labels2_deleted, _ = hnsw_index.knn_query(data2, k=1) + for l in labels2_deleted: + hnsw_index.mark_deleted(l[0]) + labels1_found, _ = hnsw_index.knn_query(data1, k=1) + items = hnsw_index.get_items(labels1) + diff_with_gt_labels = np.mean(np.abs(data1-items)) + self.assertAlmostEqual(diff_with_gt_labels, 0, delta=1e-3) + + labels2_after, _ = hnsw_index.knn_query(data2, k=1) + for la in labels2_after: + for lb in labels2_deleted: + if la[0] == lb[0]: + self.assertTrue(False) + print("All the neighbors of data2 are removed") + + # Replace deleted elements + print("Inserting batch 3 by replacing deleted elements") + # Maximum number of elements is reached therefore we cannot add new items + # but we can replace the deleted ones + labels_replaced = hnsw_index.insert_items(data3, labels3) + labels2_deleted_list = [l[0] for l in labels2_deleted] + labels_replaced_list = labels_replaced.tolist() + labels2_deleted_list.sort() + labels_replaced_list.sort() + self.assertSequenceEqual(labels2_deleted_list, labels_replaced_list) + + # After replacing, all labels should be retrievable + print("Checking that remaining labels are in index") + # Get remaining data from batch 1 and batch 2 after deletion of elements + remaining_labels = set(labels1) | set(labels2) + remaining_labels = remaining_labels - set(labels2_deleted_list) + remaining_labels_list = list(remaining_labels) + comb_data = np.concatenate((data1, data2), axis=0) + remaining_data = comb_data[remaining_labels_list] + + returned_items = hnsw_index.get_items(remaining_labels_list) + self.assertSequenceEqual(remaining_data.tolist(), returned_items) + + returned_items = hnsw_index.get_items(labels3) + self.assertSequenceEqual(data3.tolist(), returned_items) + + # Check index serialization + # Delete batch 3 + print("Deleting batch 3") + for l in labels3: + hnsw_index.mark_deleted(l) + + # Save index + index_path = "index.bin" + print(f"Saving index to {index_path}") + hnsw_index.save_index(index_path) + del hnsw_index + + # Reinit and load the index + hnsw_index = hnswlib.Index(space='l2', dim=dim) # the space can be changed - keeps the data, alters the distance function. + hnsw_index.set_num_threads(4) + print(f"Loading index from {index_path}") + hnsw_index.load_index(index_path, max_elements=max_num_elements, replace_deleted=True) + + # Insert batch 4 + print("Inserting batch 4 by replacing deleted elements") + labels_replaced = hnsw_index.insert_items(data4, labels4) + + # Check recall + print("Checking recall") + labels_found, _ = hnsw_index.knn_query(data4, k=1) + recall = np.mean(labels_found.reshape(-1) == labels4) + print(f"Recall for the 4 batch: {recall}") + self.assertGreater(recall, 0.98) + + # Delete batch 4 + print("Deleting batch 4") + for l in labels4: + hnsw_index.mark_deleted(l) + + print("Testing pickle serialization") + hnsw_index_pckl = pickle.loads(pickle.dumps(hnsw_index)) + del hnsw_index + # Insert batch 3 + print("Inserting batch 3 by replacing deleted elements") + labels_replaced = hnsw_index_pckl.insert_items(data3, labels3) + + # Check recall + print("Checking recall") + labels_found, _ = hnsw_index_pckl.knn_query(data3, k=1) + recall = np.mean(labels_found.reshape(-1) == labels3) + print(f"Recall for the 3 batch: {recall}") + self.assertGreater(recall, 0.98) + + os.remove(index_path) From 812b3a40a33cf10e6a617a144028635415b21de6 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Tue, 20 Sep 2022 08:59:44 +0200 Subject: [PATCH 02/27] Refactoring --- python_bindings/tests/bindings_test_replace.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python_bindings/tests/bindings_test_replace.py b/python_bindings/tests/bindings_test_replace.py index 366e80f2..502b309d 100644 --- a/python_bindings/tests/bindings_test_replace.py +++ b/python_bindings/tests/bindings_test_replace.py @@ -13,6 +13,8 @@ def testRandomSelf(self): num_elements = 5000 max_num_elements = 2 * num_elements + recall_threshold = 0.98 + # Generating sample data print("Generating data") # batch 1 @@ -119,7 +121,7 @@ def testRandomSelf(self): labels_found, _ = hnsw_index.knn_query(data4, k=1) recall = np.mean(labels_found.reshape(-1) == labels4) print(f"Recall for the 4 batch: {recall}") - self.assertGreater(recall, 0.98) + self.assertGreater(recall, recall_threshold) # Delete batch 4 print("Deleting batch 4") @@ -138,6 +140,6 @@ def testRandomSelf(self): labels_found, _ = hnsw_index_pckl.knn_query(data3, k=1) recall = np.mean(labels_found.reshape(-1) == labels3) print(f"Recall for the 3 batch: {recall}") - self.assertGreater(recall, 0.98) + self.assertGreater(recall, recall_threshold) os.remove(index_path) From c4bedcf0001564184941e54f871825960c963d85 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 12 Nov 2022 13:48:43 +0400 Subject: [PATCH 03/27] Refactoring --- hnswlib/hnswalg.h | 18 +++++++++--------- python_bindings/bindings.cpp | 16 ++++++++-------- python_bindings/tests/bindings_test_replace.py | 6 +++--- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 5aa4e95f..81a35f19 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -66,7 +66,7 @@ class HierarchicalNSW : public AlgorithmInterface { mutable std::atomic metric_distance_computations{0}; mutable std::atomic metric_hops{0}; - bool replace_deleted = false; + bool replace_deleted_ = false; std::mutex deleted_elements_lock; std::unordered_set deleted_elements; @@ -82,7 +82,7 @@ class HierarchicalNSW : public AlgorithmInterface { bool nmslib = false, size_t max_elements = 0, bool replace_deleted = false) - : replace_deleted(replace_deleted) { + : replace_deleted_(replace_deleted) { loadIndex(location, s, max_elements); } @@ -97,7 +97,7 @@ class HierarchicalNSW : public AlgorithmInterface { : link_list_locks_(max_elements), link_list_update_locks_(max_update_element_locks), element_levels_(max_elements), - replace_deleted(replace_deleted) { + replace_deleted_(replace_deleted) { max_elements_ = max_elements; num_deleted_ = 0; data_size_ = s->get_data_size(); @@ -703,7 +703,7 @@ class HierarchicalNSW : public AlgorithmInterface { for (size_t i = 0; i < cur_element_count; i++) { if (isMarkedDeleted(i)) { num_deleted_ += 1; - if (replace_deleted) deleted_elements.insert(i); + if (replace_deleted_) deleted_elements.insert(i); } } @@ -757,7 +757,7 @@ class HierarchicalNSW : public AlgorithmInterface { unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId))+2; *ll_cur |= DELETE_MARK; num_deleted_ += 1; - if (replace_deleted) deleted_elements.insert(internalId); + if (replace_deleted_) deleted_elements.insert(internalId); } else { throw std::runtime_error("The requested to delete element is already deleted"); } @@ -786,7 +786,7 @@ class HierarchicalNSW : public AlgorithmInterface { unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId)) + 2; *ll_cur &= ~DELETE_MARK; num_deleted_ -= 1; - if (replace_deleted) deleted_elements.erase(internalId); + if (replace_deleted_) deleted_elements.erase(internalId); } else { throw std::runtime_error("The requested to undelete element is not deleted"); } @@ -817,9 +817,9 @@ class HierarchicalNSW : public AlgorithmInterface { * * If deleted point was replaced returns its label, else returns label of added point */ - labeltype insertPoint(const void* data_point, labeltype label) { - if (!replace_deleted) { - throw std::runtime_error("Don't use insertPoint when replacement of deleted elements is disabled"); + labeltype addPointToVacantPlace(const void* data_point, labeltype label) { + if (!replace_deleted_) { + throw std::runtime_error("Can't use addPointToVacantPlace when replacement of deleted elements is disabled"); } std::unique_lock tmp_del_el_lock(deleted_elements_lock); diff --git a/python_bindings/bindings.cpp b/python_bindings/bindings.cpp index 668b7a95..aa06b483 100644 --- a/python_bindings/bindings.cpp +++ b/python_bindings/bindings.cpp @@ -230,7 +230,7 @@ class Index { } - py::object insertItems_return_numpy(py::object input, py::object ids_ = py::none(), int num_threads = -1) { + py::object add_items_to_vacant_place_return_numpy(py::object input, py::object ids_ = py::none(), int num_threads = -1) { size_t rows, features; hnswlib::labeltype* data_numpy_l = NULL; @@ -261,7 +261,7 @@ class Index { normalize_vector(vector_data, norm_array.data()); vector_data = norm_array.data(); } - hnswlib::labeltype label = appr_alg->insertPoint((void*)vector_data, (size_t)id); + hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)vector_data, (size_t)id); data_numpy_l[start] = label; start = 1; ep_added = true; @@ -271,7 +271,7 @@ class Index { if (normalize == false) { ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) { size_t id = ids.size() ? ids.at(row) : (cur_l + row); - hnswlib::labeltype label = appr_alg->insertPoint((void*)items.data(row), (size_t)id); + hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)items.data(row), (size_t)id); data_numpy_l[row] = label; }); } @@ -283,7 +283,7 @@ class Index { normalize_vector((float*)items.data(row), (norm_array.data() + start_idx)); size_t id = ids.size() ? ids.at(row) : (cur_l + row); - hnswlib::labeltype label = appr_alg->insertPoint((void*)(norm_array.data() + start_idx), (size_t)id); + hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)(norm_array.data() + start_idx), (size_t)id); data_numpy_l[row] = label; }); } @@ -472,7 +472,7 @@ class Index { "ef"_a = appr_alg->ef_, "has_deletions"_a = (bool)appr_alg->num_deleted_, "size_links_per_element"_a = appr_alg->size_links_per_element_, - "replace_deleted"_a = appr_alg->replace_deleted, + "replace_deleted"_a = appr_alg->replace_deleted_, "label_lookup_external"_a = py::array_t( { appr_alg->label_lookup_.size() }, // shape @@ -639,7 +639,7 @@ class Index { if (d.contains("replace_deleted")) { replace_deleted = d["replace_deleted"].cast(); } - appr_alg->replace_deleted = replace_deleted; + appr_alg->replace_deleted_= replace_deleted; appr_alg->num_deleted_ = 0; bool has_deletions = d["has_deletions"].cast(); @@ -941,8 +941,8 @@ PYBIND11_PLUGIN(hnswlib) { py::arg("data"), py::arg("ids") = py::none(), py::arg("num_threads") = -1) - .def("insert_items", - &Index::insertItems_return_numpy, + .def("add_items_to_vacant_place", + &Index::add_items_to_vacant_place_return_numpy, py::arg("data"), py::arg("ids") = py::none(), py::arg("num_threads") = -1) diff --git a/python_bindings/tests/bindings_test_replace.py b/python_bindings/tests/bindings_test_replace.py index 502b309d..c8577339 100644 --- a/python_bindings/tests/bindings_test_replace.py +++ b/python_bindings/tests/bindings_test_replace.py @@ -72,7 +72,7 @@ def testRandomSelf(self): print("Inserting batch 3 by replacing deleted elements") # Maximum number of elements is reached therefore we cannot add new items # but we can replace the deleted ones - labels_replaced = hnsw_index.insert_items(data3, labels3) + labels_replaced = hnsw_index.add_items_to_vacant_place(data3, labels3) labels2_deleted_list = [l[0] for l in labels2_deleted] labels_replaced_list = labels_replaced.tolist() labels2_deleted_list.sort() @@ -114,7 +114,7 @@ def testRandomSelf(self): # Insert batch 4 print("Inserting batch 4 by replacing deleted elements") - labels_replaced = hnsw_index.insert_items(data4, labels4) + labels_replaced = hnsw_index.add_items_to_vacant_place(data4, labels4) # Check recall print("Checking recall") @@ -133,7 +133,7 @@ def testRandomSelf(self): del hnsw_index # Insert batch 3 print("Inserting batch 3 by replacing deleted elements") - labels_replaced = hnsw_index_pckl.insert_items(data3, labels3) + labels_replaced = hnsw_index_pckl.add_items_to_vacant_place(data3, labels3) # Check recall print("Checking recall") From 0f3214c0e275827186bb7a5aa82c9e430f67f81d Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 12 Nov 2022 16:00:39 +0400 Subject: [PATCH 04/27] Add stress test to check multithreading --- .gitignore | 1 + .../tests/bindings_test_replace.py | 2 +- .../tests/bindings_test_stress_mt_replace.py | 62 +++++++++++++++++++ 3 files changed, 64 insertions(+), 1 deletion(-) create mode 100644 python_bindings/tests/bindings_test_stress_mt_replace.py diff --git a/.gitignore b/.gitignore index a338107c..48f74604 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ var/ .idea/ .vscode/ .vs/ +**.DS_Store diff --git a/python_bindings/tests/bindings_test_replace.py b/python_bindings/tests/bindings_test_replace.py index c8577339..63f7280f 100644 --- a/python_bindings/tests/bindings_test_replace.py +++ b/python_bindings/tests/bindings_test_replace.py @@ -57,7 +57,7 @@ def testRandomSelf(self): for l in labels2_deleted: hnsw_index.mark_deleted(l[0]) labels1_found, _ = hnsw_index.knn_query(data1, k=1) - items = hnsw_index.get_items(labels1) + items = hnsw_index.get_items(labels1_found) diff_with_gt_labels = np.mean(np.abs(data1-items)) self.assertAlmostEqual(diff_with_gt_labels, 0, delta=1e-3) diff --git a/python_bindings/tests/bindings_test_stress_mt_replace.py b/python_bindings/tests/bindings_test_stress_mt_replace.py new file mode 100644 index 00000000..551c6fe9 --- /dev/null +++ b/python_bindings/tests/bindings_test_stress_mt_replace.py @@ -0,0 +1,62 @@ +import unittest + +import numpy as np + +import hnswlib + + +class RandomSelfTestCase(unittest.TestCase): + def testRandomSelf(self): + dim = 16 + num_elements = 1_000 + max_num_elements = 2 * num_elements + + # Generating sample data + # batch 1 + first_id = 0 + last_id = num_elements + labels1 = np.arange(first_id, last_id) + data1 = np.float32(np.random.random((num_elements, dim))) + # batch 2 + first_id += num_elements + last_id += num_elements + labels2 = np.arange(first_id, last_id) + data2 = np.float32(np.random.random((num_elements, dim))) + # batch 3 + first_id += num_elements + last_id += num_elements + labels3 = np.arange(first_id, last_id) + data3 = np.float32(np.random.random((num_elements, dim))) + + # Declaring index + for _ in range(100): + hnsw_index = hnswlib.Index(space='l2', dim=dim) + hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, replace_deleted=True) + + hnsw_index.set_ef(100) + hnsw_index.set_num_threads(50) + + # Add batch 1 and 2 + hnsw_index.add_items(data1, labels1) + hnsw_index.add_items(data2, labels2) # maximum number of elements is reached + + # Delete nearest neighbors of batch 2 + labels2_deleted, _ = hnsw_index.knn_query(data2, k=1) + for l in labels2_deleted: + hnsw_index.mark_deleted(l[0]) + labels1_found, _ = hnsw_index.knn_query(data1, k=1) + items = hnsw_index.get_items(labels1_found) + diff_with_gt_labels = np.mean(np.abs(data1-items)) + self.assertAlmostEqual(diff_with_gt_labels, 0, delta=1e-3) + + labels2_after, _ = hnsw_index.knn_query(data2, k=1) + labels2_after_flat = labels2_after.flatten() + labels2_deleted_flat = labels2_deleted.flatten() + common = np.intersect1d(labels2_after_flat, labels2_deleted_flat) + self.assertTrue(common.size == 0) + + # Replace deleted elements + # Maximum number of elements is reached therefore we cannot add new items + # but we can replace the deleted ones + labels_replaced = hnsw_index.add_items_to_vacant_place(data3, labels3) + \ No newline at end of file From 34fe7f19a02b51e8bd5c21932770ded1ef844898 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Mon, 14 Nov 2022 14:12:26 +0400 Subject: [PATCH 05/27] Fix possible multithreading issues --- hnswlib/hnswalg.h | 116 ++++++++++++++++++++++++++++++---------------- 1 file changed, 75 insertions(+), 41 deletions(-) diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 9a91fe49..f58a5b6f 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -38,10 +38,9 @@ class HierarchicalNSW : public AlgorithmInterface { // Locks to prevent race condition during update/insert of an element at same time. // Note: Locks for additions can also be used to prevent this race condition // if the querying of KNN is not exposed along with update/inserts i.e multithread insert/update/query in parallel. - std::vector link_list_update_locks_; + mutable std::vector link_list_update_locks_; std::mutex global; - std::mutex cur_element_count_guard_; std::vector link_list_locks_; tableint enterpoint_node_{0}; @@ -57,7 +56,8 @@ class HierarchicalNSW : public AlgorithmInterface { DISTFUNC fstdistfunc_; void *dist_func_param_{nullptr}; - std::mutex label_lookup_lock; + + mutable std::mutex label_lookup_lock; // lock for label_lookup_ std::unordered_map label_lookup_; std::default_random_engine level_generator_; @@ -68,7 +68,7 @@ class HierarchicalNSW : public AlgorithmInterface { bool replace_deleted_ = false; - std::mutex deleted_elements_lock; + std::mutex deleted_elements_lock; // lock for deleted_elements std::unordered_set deleted_elements; @@ -714,14 +714,16 @@ class HierarchicalNSW : public AlgorithmInterface { template std::vector getDataByLabel(labeltype label) const { - tableint label_internal; + std::unique_lock lock_table(label_lookup_lock); auto search = label_lookup_.find(label); if (search == label_lookup_.end() || isMarkedDeleted(search->second)) { throw std::runtime_error("Label not found"); } - label_internal = search->second; - - char* data_ptrv = getDataByInternalId(label_internal); + tableint internalId = search->second; + lock_table.unlock(); + // wait for element addition or update + std::unique_lock lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]); + char* data_ptrv = getDataByInternalId(internalId); size_t dim = *((size_t *) dist_func_param_); std::vector data; data_t* data_ptr = (data_t*) data_ptrv; @@ -737,11 +739,15 @@ class HierarchicalNSW : public AlgorithmInterface { * Marks an element with the given label deleted, does NOT really change the current graph. */ void markDelete(labeltype label) { + std::unique_lock lock_table(label_lookup_lock); auto search = label_lookup_.find(label); if (search == label_lookup_.end()) { throw std::runtime_error("Label not found"); } tableint internalId = search->second; + lock_table.unlock(); + // wait for element addition or update + std::unique_lock lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]); markDeletedInternal(internalId); } @@ -756,7 +762,10 @@ class HierarchicalNSW : public AlgorithmInterface { unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId))+2; *ll_cur |= DELETE_MARK; num_deleted_ += 1; - if (replace_deleted_) deleted_elements.insert(internalId); + if (replace_deleted_) { + std::unique_lock lock_deleted_elements(deleted_elements_lock); + deleted_elements.insert(internalId); + } } else { throw std::runtime_error("The requested to delete element is already deleted"); } @@ -767,25 +776,36 @@ class HierarchicalNSW : public AlgorithmInterface { * Remove the deleted mark of the node, does NOT really change the current graph. */ void unmarkDelete(labeltype label) { + std::unique_lock lock_table(label_lookup_lock); auto search = label_lookup_.find(label); if (search == label_lookup_.end()) { throw std::runtime_error("Label not found"); } tableint internalId = search->second; + lock_table.unlock(); + // wait for element addition or update + std::unique_lock lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]); unmarkDeletedInternal(internalId); } + /** - * Remove the deleted mark of the node. - */ + * Remove the deleted mark of the node. + * + * Note: the method is not safe to use when replacement of deleted elements is enabled + * bacause elements marked as deleted can be completely removed from the index + */ void unmarkDeletedInternal(tableint internalId) { assert(internalId < cur_element_count); if (isMarkedDeleted(internalId)) { unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId)) + 2; *ll_cur &= ~DELETE_MARK; num_deleted_ -= 1; - if (replace_deleted_) deleted_elements.erase(internalId); + if (replace_deleted_) { + std::unique_lock lock_deleted_elements(deleted_elements_lock); + deleted_elements.erase(internalId); + } } else { throw std::runtime_error("The requested to undelete element is not deleted"); } @@ -813,42 +833,49 @@ class HierarchicalNSW : public AlgorithmInterface { /** * Adds point and replaces previously deleted point if any, updating it with new point - * - * If deleted point was replaced returns its label, else returns label of added point + * If deleted point was replaced returns its label, else returns label of added or updated point + * + * Note: + * Methods that can work with deleted elements unmarkDelete and addPoint are not safe to use + * with this method. Because addPointToVacantPlace removes deleted elements from the index. */ labeltype addPointToVacantPlace(const void* data_point, labeltype label) { if (!replace_deleted_) { throw std::runtime_error("Can't use addPointToVacantPlace when replacement of deleted elements is disabled"); } - std::unique_lock tmp_del_el_lock(deleted_elements_lock); - bool is_empty = deleted_elements.empty(); - tmp_del_el_lock.unlock(); - - if (is_empty) { - addPoint(data_point, label); - return label; + // check if there is vacant place + tableint internal_id_replaced; + std::unique_lock lock_deleted_elements(deleted_elements_lock); + bool is_vacant_place = !deleted_elements.empty(); + if (is_vacant_place) { + internal_id_replaced = *deleted_elements.begin(); + deleted_elements.erase(internal_id_replaced); } - else { - tmp_del_el_lock.lock(); - tableint id_replace = *deleted_elements.begin(); - deleted_elements.erase(id_replace); - tmp_del_el_lock.unlock(); - - // use link list locks to not block calls for other elements - std::unique_lock lock_label_update(link_list_update_locks_[(id_replace & (max_update_element_locks - 1))]); - labeltype label_replace = getExternalLabel(id_replace); - setExternalLabel(id_replace, label); - lock_label_update.unlock(); - - std::unique_lock tmp_label_lookup_lock(label_lookup_lock); - label_lookup_.erase(label_replace); - label_lookup_[label] = id_replace; - tmp_label_lookup_lock.unlock(); + lock_deleted_elements.unlock(); + // if there is no vacant place then add or update point + // else add point to vacant place + if (!is_vacant_place) { addPoint(data_point, label); - - return label_replace; + return label; + } else { + // wait for element addition or update + std::unique_lock lock_el_update(link_list_update_locks_[(internal_id_replaced & (max_update_element_locks - 1))]); + labeltype label_replaced = getExternalLabel(internal_id_replaced); + setExternalLabel(internal_id_replaced, label); + lock_el_update.unlock(); + + std::unique_lock lock_table(label_lookup_lock); + label_lookup_.erase(label_replaced); + label_lookup_[label] = internal_id_replaced; + lock_table.unlock(); + + lock_el_update.lock(); + unmarkDeletedInternal(internal_id_replaced); + updatePoint(data_point, internal_id_replaced, 1.0); + + return label_replaced; } } @@ -1024,11 +1051,18 @@ class HierarchicalNSW : public AlgorithmInterface { { // Checking if the element with the same label already exists // if so, updating it *instead* of creating a new element. - std::unique_lock templock_curr(cur_element_count_guard_); + std::unique_lock lock_table(label_lookup_lock); auto search = label_lookup_.find(label); if (search != label_lookup_.end()) { tableint existingInternalId = search->second; - templock_curr.unlock(); + if (replace_deleted_) { + // wait for element addition or update + std::unique_lock lock_el_update(link_list_update_locks_[(existingInternalId & (max_update_element_locks - 1))]); + if (isMarkedDeleted(existingInternalId)) { + throw std::runtime_error("Can't use addPoint to update deleted elements if replacement of deleted elements is enabled."); + } + } + lock_table.unlock(); std::unique_lock lock_el_update(link_list_update_locks_[(existingInternalId & (max_update_element_locks - 1))]); From 22cdb466c8f4495c424587185244c018088f979a Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Mon, 14 Nov 2022 14:40:39 +0400 Subject: [PATCH 06/27] Refactoring --- hnswlib/hnswalg.h | 26 +++++++++----------------- 1 file changed, 9 insertions(+), 17 deletions(-) diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index f58a5b6f..20284634 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -721,8 +721,6 @@ class HierarchicalNSW : public AlgorithmInterface { } tableint internalId = search->second; lock_table.unlock(); - // wait for element addition or update - std::unique_lock lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]); char* data_ptrv = getDataByInternalId(internalId); size_t dim = *((size_t *) dist_func_param_); std::vector data; @@ -746,8 +744,6 @@ class HierarchicalNSW : public AlgorithmInterface { } tableint internalId = search->second; lock_table.unlock(); - // wait for element addition or update - std::unique_lock lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]); markDeletedInternal(internalId); } @@ -773,8 +769,11 @@ class HierarchicalNSW : public AlgorithmInterface { /** - * Remove the deleted mark of the node, does NOT really change the current graph. - */ + * Remove the deleted mark of the node, does NOT really change the current graph. + * + * Note: the method is not safe to use when replacement of deleted elements is enabled + * bacause elements marked as deleted can be completely removed from the index + */ void unmarkDelete(labeltype label) { std::unique_lock lock_table(label_lookup_lock); auto search = label_lookup_.find(label); @@ -783,19 +782,14 @@ class HierarchicalNSW : public AlgorithmInterface { } tableint internalId = search->second; lock_table.unlock(); - // wait for element addition or update - std::unique_lock lock_el_update(link_list_update_locks_[(internalId & (max_update_element_locks - 1))]); unmarkDeletedInternal(internalId); } /** - * Remove the deleted mark of the node. - * - * Note: the method is not safe to use when replacement of deleted elements is enabled - * bacause elements marked as deleted can be completely removed from the index - */ + * Remove the deleted mark of the node. + */ void unmarkDeletedInternal(tableint internalId) { assert(internalId < cur_element_count); if (isMarkedDeleted(internalId)) { @@ -860,18 +854,16 @@ class HierarchicalNSW : public AlgorithmInterface { addPoint(data_point, label); return label; } else { - // wait for element addition or update - std::unique_lock lock_el_update(link_list_update_locks_[(internal_id_replaced & (max_update_element_locks - 1))]); + // no need to protect element from additions and updates as + // we assume that there are no concurrent operations on deleted element labeltype label_replaced = getExternalLabel(internal_id_replaced); setExternalLabel(internal_id_replaced, label); - lock_el_update.unlock(); std::unique_lock lock_table(label_lookup_lock); label_lookup_.erase(label_replaced); label_lookup_[label] = internal_id_replaced; lock_table.unlock(); - lock_el_update.lock(); unmarkDeletedInternal(internal_id_replaced); updatePoint(data_point, internal_id_replaced, 1.0); From 1f12fdbfdeed580530e3b7e0c5ea1e0168e7c7a5 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 26 Nov 2022 17:22:23 +0400 Subject: [PATCH 07/27] Add C++ multi thread load test --- .github/workflows/build.yml | 1 + CMakeLists.txt | 3 ++ examples/multiThreadLoad_test.cpp | 53 +++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+) create mode 100644 examples/multiThreadLoad_test.cpp diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e70f94c7..82d92a53 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -59,6 +59,7 @@ jobs: fi ./searchKnnCloserFirst_test ./searchKnnWithFilter_test + ./multiThreadLoad_test ./test_updates ./test_updates update shell: bash diff --git a/CMakeLists.txt b/CMakeLists.txt index e42d6cee..932a2872 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,6 +25,9 @@ if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) add_executable(searchKnnWithFilter_test examples/searchKnnWithFilter_test.cpp) target_link_libraries(searchKnnWithFilter_test hnswlib) + add_executable(multiThreadLoad_test examples/multiThreadLoad_test.cpp) + target_link_libraries(multiThreadLoad_test hnswlib) + add_executable(main main.cpp sift_1b.cpp) target_link_libraries(main hnswlib) endif() diff --git a/examples/multiThreadLoad_test.cpp b/examples/multiThreadLoad_test.cpp new file mode 100644 index 00000000..43b1c0a4 --- /dev/null +++ b/examples/multiThreadLoad_test.cpp @@ -0,0 +1,53 @@ +#include "../hnswlib/hnswlib.h" +#include + +int main() { + int d = 16; + int max_elements = 100; + + std::mt19937 rng; + rng.seed(47); + std::uniform_real_distribution<> distrib_real; + + hnswlib::L2Space space(d); + hnswlib::HierarchicalNSW* alg_hnsw = new hnswlib::HierarchicalNSW(&space, max_elements); + + int num_threads = 40; + int num_ids = 1; + + int num_iterations = 10; + int start_id = 0; + + while (true) { + std::uniform_int_distribution<> distrib_int(start_id, start_id + num_ids - 1); + std::vector threads; + for (size_t thread_id = 0; thread_id < num_threads; thread_id++) { + threads.push_back( + std::thread( + [&] { + for (int iter = 0; iter < num_iterations; iter++) { + std::vector data(d); + int id = distrib_int(rng); + //std::cout << id << std::endl; + for (int i = 0; i < d; i++) { + data[i] = distrib_real(rng); + } + alg_hnsw->addPoint(data.data(), id); + } + } + ) + ); + } + for (auto &thread : threads) { + thread.join(); + } + //std::cout << alg_hnsw->cur_element_count << std::endl; + if (alg_hnsw->cur_element_count > max_elements - num_ids) { + //std::cout << "Exit" << std::endl; + break; + } + start_id += num_ids; + } + + return 0; +} From 270b2370679d97cdbf884548f8dcdedbcdae5ce1 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 26 Nov 2022 17:31:49 +0400 Subject: [PATCH 08/27] Add timeout to jobs in actions --- .github/workflows/build.yml | 2 ++ examples/multiThreadLoad_test.cpp | 5 ++++- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 82d92a53..153ebdb2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -19,6 +19,7 @@ jobs: run: python -m pip install . - name: Test + timeout-minutes: 15 run: python -m unittest discover -v --start-directory python_bindings/tests --pattern "*_test*.py" test_cpp: @@ -52,6 +53,7 @@ jobs: shell: bash - name: Test + timeout-minutes: 15 run: | cd build if [ "$RUNNER_OS" == "Windows" ]; then diff --git a/examples/multiThreadLoad_test.cpp b/examples/multiThreadLoad_test.cpp index 43b1c0a4..39fd9b1e 100644 --- a/examples/multiThreadLoad_test.cpp +++ b/examples/multiThreadLoad_test.cpp @@ -2,8 +2,10 @@ #include int main() { + + std::cout << "Running multithread load test" << std::endl; int d = 16; - int max_elements = 100; + int max_elements = 1000; std::mt19937 rng; rng.seed(47); @@ -49,5 +51,6 @@ int main() { start_id += num_ids; } + std::cout << "Finish" << std::endl; return 0; } From c2fb5740a97e20bc4cdef39bc40ed767cba73941 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 26 Nov 2022 20:13:15 +0400 Subject: [PATCH 09/27] Add locks by label --- examples/multiThreadLoad_test.cpp | 3 +- hnswlib/hnswalg.h | 83 +++++++++++++++++++++---------- 2 files changed, 60 insertions(+), 26 deletions(-) diff --git a/examples/multiThreadLoad_test.cpp b/examples/multiThreadLoad_test.cpp index 39fd9b1e..f12bf106 100644 --- a/examples/multiThreadLoad_test.cpp +++ b/examples/multiThreadLoad_test.cpp @@ -14,8 +14,9 @@ int main() { hnswlib::L2Space space(d); hnswlib::HierarchicalNSW* alg_hnsw = new hnswlib::HierarchicalNSW(&space, max_elements); + // with these parameters about 7 threads will do operations with the same label simultaneously int num_threads = 40; - int num_ids = 1; + int num_ids = 10; int num_iterations = 10; int start_id = 0; diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 20284634..8ae39f4c 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -16,7 +16,8 @@ typedef unsigned int linklistsizeint; template class HierarchicalNSW : public AlgorithmInterface { public: - static const tableint max_update_element_locks = 65536; + static const tableint MAX_ELEMENT_UPDATE_LOCKS = 65536; + static const tableint MAX_LABEL_OPERATION_LOCKS = 65536; static const unsigned char DELETE_MARK = 0x01; size_t max_elements_{0}; @@ -38,7 +39,9 @@ class HierarchicalNSW : public AlgorithmInterface { // Locks to prevent race condition during update/insert of an element at same time. // Note: Locks for additions can also be used to prevent this race condition // if the querying of KNN is not exposed along with update/inserts i.e multithread insert/update/query in parallel. - mutable std::vector link_list_update_locks_; + mutable std::vector element_update_locks_; + // Locks operations with element by label value + mutable std::vector label_op_locks_; std::mutex global; std::vector link_list_locks_; @@ -95,7 +98,8 @@ class HierarchicalNSW : public AlgorithmInterface { size_t random_seed = 100, bool replace_deleted = false) : link_list_locks_(max_elements), - link_list_update_locks_(max_update_element_locks), + element_update_locks_(MAX_ELEMENT_UPDATE_LOCKS), + label_op_locks_(MAX_LABEL_OPERATION_LOCKS), element_levels_(max_elements), replace_deleted_(replace_deleted) { max_elements_ = max_elements; @@ -163,6 +167,20 @@ class HierarchicalNSW : public AlgorithmInterface { } + inline std::mutex& getUpdateElMutex(tableint internal_id) const { + // calculate hash + size_t lock_id = internal_id & (MAX_ELEMENT_UPDATE_LOCKS - 1); + return element_update_locks_[lock_id]; + } + + + inline std::mutex& getLabelOpMutex(labeltype label) const { + // calculate hash + size_t lock_id = label & (MAX_LABEL_OPERATION_LOCKS - 1); + return label_op_locks_[lock_id]; + } + + inline labeltype getExternalLabel(tableint internal_id) const { labeltype return_label; memcpy(&return_label, (data_level0_memory_ + internal_id * size_data_per_element_ + label_offset_), sizeof(labeltype)); @@ -673,7 +691,8 @@ class HierarchicalNSW : public AlgorithmInterface { size_links_level0_ = maxM0_ * sizeof(tableint) + sizeof(linklistsizeint); std::vector(max_elements).swap(link_list_locks_); - std::vector(max_update_element_locks).swap(link_list_update_locks_); + std::vector(MAX_ELEMENT_UPDATE_LOCKS).swap(element_update_locks_); + std::vector(MAX_LABEL_OPERATION_LOCKS).swap(label_op_locks_); visited_list_pool_ = new VisitedListPool(1, max_elements); @@ -714,6 +733,9 @@ class HierarchicalNSW : public AlgorithmInterface { template std::vector getDataByLabel(labeltype label) const { + // lock all operations with element by label + std::unique_lock lock_label(getLabelOpMutex(label)); + std::unique_lock lock_table(label_lookup_lock); auto search = label_lookup_.find(label); if (search == label_lookup_.end() || isMarkedDeleted(search->second)) { @@ -721,6 +743,7 @@ class HierarchicalNSW : public AlgorithmInterface { } tableint internalId = search->second; lock_table.unlock(); + char* data_ptrv = getDataByInternalId(internalId); size_t dim = *((size_t *) dist_func_param_); std::vector data; @@ -733,10 +756,13 @@ class HierarchicalNSW : public AlgorithmInterface { } - /** - * Marks an element with the given label deleted, does NOT really change the current graph. - */ + /* + * Marks an element with the given label deleted, does NOT really change the current graph. + */ void markDelete(labeltype label) { + // lock all operations with element by label + std::unique_lock lock_label(getLabelOpMutex(label)); + std::unique_lock lock_table(label_lookup_lock); auto search = label_lookup_.find(label); if (search == label_lookup_.end()) { @@ -744,14 +770,15 @@ class HierarchicalNSW : public AlgorithmInterface { } tableint internalId = search->second; lock_table.unlock(); + markDeletedInternal(internalId); } - /** - * Uses the last 16 bits of the memory for the linked list size to store the mark, - * whereas maxM0_ has to be limited to the lower 16 bits, however, still large enough in almost all cases. - */ + /* + * Uses the last 16 bits of the memory for the linked list size to store the mark, + * whereas maxM0_ has to be limited to the lower 16 bits, however, still large enough in almost all cases. + */ void markDeletedInternal(tableint internalId) { assert(internalId < cur_element_count); if (!isMarkedDeleted(internalId)) { @@ -768,13 +795,16 @@ class HierarchicalNSW : public AlgorithmInterface { } - /** - * Remove the deleted mark of the node, does NOT really change the current graph. + /* + * Removes the deleted mark of the node, does NOT really change the current graph. * * Note: the method is not safe to use when replacement of deleted elements is enabled * bacause elements marked as deleted can be completely removed from the index */ void unmarkDelete(labeltype label) { + // lock all operations with element by label + std::unique_lock lock_label(getLabelOpMutex(label)); + std::unique_lock lock_table(label_lookup_lock); auto search = label_lookup_.find(label); if (search == label_lookup_.end()) { @@ -782,14 +812,15 @@ class HierarchicalNSW : public AlgorithmInterface { } tableint internalId = search->second; lock_table.unlock(); + unmarkDeletedInternal(internalId); } - /** - * Remove the deleted mark of the node. - */ + /* + * Remove the deleted mark of the node. + */ void unmarkDeletedInternal(tableint internalId) { assert(internalId < cur_element_count); if (isMarkedDeleted(internalId)) { @@ -806,9 +837,9 @@ class HierarchicalNSW : public AlgorithmInterface { } - /** - * Checks the first 16 bits of the memory to see if the element is marked deleted. - */ + /* + * Checks the first 16 bits of the memory to see if the element is marked deleted. + */ bool isMarkedDeleted(tableint internalId) const { unsigned char *ll_cur = ((unsigned char*)get_linklist0(internalId)) + 2; return *ll_cur & DELETE_MARK; @@ -825,7 +856,7 @@ class HierarchicalNSW : public AlgorithmInterface { } - /** + /* * Adds point and replaces previously deleted point if any, updating it with new point * If deleted point was replaced returns its label, else returns label of added or updated point * @@ -872,10 +903,12 @@ class HierarchicalNSW : public AlgorithmInterface { } - /** - * Adds point. Updates the point if it is already in the index + /* + * Adds point. Updates the point if it is already in the index */ void addPoint(const void *data_point, labeltype label) { + // lock all operations with element by label + std::unique_lock lock_label(getLabelOpMutex(label)); addPoint(data_point, label, -1); } @@ -1049,14 +1082,14 @@ class HierarchicalNSW : public AlgorithmInterface { tableint existingInternalId = search->second; if (replace_deleted_) { // wait for element addition or update - std::unique_lock lock_el_update(link_list_update_locks_[(existingInternalId & (max_update_element_locks - 1))]); + std::unique_lock lock_el_update(getUpdateElMutex(existingInternalId)); if (isMarkedDeleted(existingInternalId)) { throw std::runtime_error("Can't use addPoint to update deleted elements if replacement of deleted elements is enabled."); } } lock_table.unlock(); - std::unique_lock lock_el_update(link_list_update_locks_[(existingInternalId & (max_update_element_locks - 1))]); + std::unique_lock lock_el_update(getUpdateElMutex(existingInternalId)); if (isMarkedDeleted(existingInternalId)) { unmarkDeletedInternal(existingInternalId); @@ -1076,7 +1109,7 @@ class HierarchicalNSW : public AlgorithmInterface { } // Take update lock to prevent race conditions on an element with insertion/update at the same time. - std::unique_lock lock_el_update(link_list_update_locks_[(cur_c & (max_update_element_locks - 1))]); + std::unique_lock lock_el_update(getUpdateElMutex(cur_c)); std::unique_lock lock_el(link_list_locks_[cur_c]); int curlevel = getRandomLevel(mult_); if (level > 0) From c750df8385354a2bb003c04bec5ea99a55972e71 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 26 Nov 2022 20:20:19 +0400 Subject: [PATCH 10/27] Remove previous element update locks as now we have locks by label --- hnswlib/hnswalg.h | 20 ------------------- python_bindings/tests/bindings_test_recall.py | 2 +- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 8ae39f4c..e47d8726 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -16,7 +16,6 @@ typedef unsigned int linklistsizeint; template class HierarchicalNSW : public AlgorithmInterface { public: - static const tableint MAX_ELEMENT_UPDATE_LOCKS = 65536; static const tableint MAX_LABEL_OPERATION_LOCKS = 65536; static const unsigned char DELETE_MARK = 0x01; @@ -36,10 +35,6 @@ class HierarchicalNSW : public AlgorithmInterface { VisitedListPool *visited_list_pool_{nullptr}; - // Locks to prevent race condition during update/insert of an element at same time. - // Note: Locks for additions can also be used to prevent this race condition - // if the querying of KNN is not exposed along with update/inserts i.e multithread insert/update/query in parallel. - mutable std::vector element_update_locks_; // Locks operations with element by label value mutable std::vector label_op_locks_; @@ -98,7 +93,6 @@ class HierarchicalNSW : public AlgorithmInterface { size_t random_seed = 100, bool replace_deleted = false) : link_list_locks_(max_elements), - element_update_locks_(MAX_ELEMENT_UPDATE_LOCKS), label_op_locks_(MAX_LABEL_OPERATION_LOCKS), element_levels_(max_elements), replace_deleted_(replace_deleted) { @@ -167,13 +161,6 @@ class HierarchicalNSW : public AlgorithmInterface { } - inline std::mutex& getUpdateElMutex(tableint internal_id) const { - // calculate hash - size_t lock_id = internal_id & (MAX_ELEMENT_UPDATE_LOCKS - 1); - return element_update_locks_[lock_id]; - } - - inline std::mutex& getLabelOpMutex(labeltype label) const { // calculate hash size_t lock_id = label & (MAX_LABEL_OPERATION_LOCKS - 1); @@ -691,7 +678,6 @@ class HierarchicalNSW : public AlgorithmInterface { size_links_level0_ = maxM0_ * sizeof(tableint) + sizeof(linklistsizeint); std::vector(max_elements).swap(link_list_locks_); - std::vector(MAX_ELEMENT_UPDATE_LOCKS).swap(element_update_locks_); std::vector(MAX_LABEL_OPERATION_LOCKS).swap(label_op_locks_); visited_list_pool_ = new VisitedListPool(1, max_elements); @@ -1081,16 +1067,12 @@ class HierarchicalNSW : public AlgorithmInterface { if (search != label_lookup_.end()) { tableint existingInternalId = search->second; if (replace_deleted_) { - // wait for element addition or update - std::unique_lock lock_el_update(getUpdateElMutex(existingInternalId)); if (isMarkedDeleted(existingInternalId)) { throw std::runtime_error("Can't use addPoint to update deleted elements if replacement of deleted elements is enabled."); } } lock_table.unlock(); - std::unique_lock lock_el_update(getUpdateElMutex(existingInternalId)); - if (isMarkedDeleted(existingInternalId)) { unmarkDeletedInternal(existingInternalId); } @@ -1108,8 +1090,6 @@ class HierarchicalNSW : public AlgorithmInterface { label_lookup_[label] = cur_c; } - // Take update lock to prevent race conditions on an element with insertion/update at the same time. - std::unique_lock lock_el_update(getUpdateElMutex(cur_c)); std::unique_lock lock_el(link_list_locks_[cur_c]); int curlevel = getRandomLevel(mult_); if (level > 0) diff --git a/python_bindings/tests/bindings_test_recall.py b/python_bindings/tests/bindings_test_recall.py index 55a970d1..2190ba45 100644 --- a/python_bindings/tests/bindings_test_recall.py +++ b/python_bindings/tests/bindings_test_recall.py @@ -40,7 +40,7 @@ def testRandomSelf(self): # Set number of threads used during batch search/construction in hnsw # By default using all available cores - hnsw_index.set_num_threads(1) + hnsw_index.set_num_threads(4) print("Adding batch of %d elements" % (len(data))) hnsw_index.add_items(data) From ef7e383dc230ad6410dc23083b0797f4fd570f32 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 26 Nov 2022 20:46:29 +0400 Subject: [PATCH 11/27] Refactoring --- hnswlib/hnswalg.h | 2 +- python_bindings/bindings.cpp | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index e47d8726..dcc25aa0 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -20,7 +20,7 @@ class HierarchicalNSW : public AlgorithmInterface { static const unsigned char DELETE_MARK = 0x01; size_t max_elements_{0}; - size_t cur_element_count{0}; + mutable std::atomic cur_element_count{0}; // current number of elements size_t size_data_per_element_{0}; size_t size_links_per_element_{0}; mutable std::atomic num_deleted_{0}; // number of deleted elements diff --git a/python_bindings/bindings.cpp b/python_bindings/bindings.cpp index ee20add1..4ea89329 100644 --- a/python_bindings/bindings.cpp +++ b/python_bindings/bindings.cpp @@ -473,7 +473,7 @@ class Index { return py::dict( "offset_level0"_a = appr_alg->offsetLevel0_, "max_elements"_a = appr_alg->max_elements_, - "cur_element_count"_a = appr_alg->cur_element_count, + "cur_element_count"_a = (size_t)appr_alg->cur_element_count, "size_data_per_element"_a = appr_alg->size_data_per_element_, "label_offset"_a = appr_alg->label_offset_, "offset_data"_a = appr_alg->offsetData_, @@ -1006,7 +1006,7 @@ PYBIND11_PLUGIN(hnswlib) { return index.index_inited ? index.appr_alg->max_elements_ : 0; }) .def_property_readonly("element_count", [](const Index & index) { - return index.index_inited ? index.appr_alg->cur_element_count : 0; + return index.index_inited ? (size_t)index.appr_alg->cur_element_count : 0; }) .def_property_readonly("ef_construction", [](const Index & index) { return index.index_inited ? index.appr_alg->ef_construction_ : 0; From 1741f50c9dfe576100ff35952d42a6ec873e86e5 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 27 Nov 2022 13:16:35 +0400 Subject: [PATCH 12/27] Update addPointToVacantPlace --- hnswlib/hnswalg.h | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index dcc25aa0..1f72c323 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -784,8 +784,8 @@ class HierarchicalNSW : public AlgorithmInterface { /* * Removes the deleted mark of the node, does NOT really change the current graph. * - * Note: the method is not safe to use when replacement of deleted elements is enabled - * bacause elements marked as deleted can be completely removed from the index + * Note: the method is not safe to use when replacement of deleted elements is enabled, + * bacause elements marked as deleted can be completely removed by addPointToVacantPlace */ void unmarkDelete(labeltype label) { // lock all operations with element by label @@ -848,9 +848,12 @@ class HierarchicalNSW : public AlgorithmInterface { * * Note: * Methods that can work with deleted elements unmarkDelete and addPoint are not safe to use - * with this method. Because addPointToVacantPlace removes deleted elements from the index. + * with this method, because addPointToVacantPlace removes deleted elements from the index. */ labeltype addPointToVacantPlace(const void* data_point, labeltype label) { + // lock all operations with element by label + std::unique_lock lock_label(getLabelOpMutex(label)); + if (!replace_deleted_) { throw std::runtime_error("Can't use addPointToVacantPlace when replacement of deleted elements is disabled"); } @@ -868,10 +871,9 @@ class HierarchicalNSW : public AlgorithmInterface { // if there is no vacant place then add or update point // else add point to vacant place if (!is_vacant_place) { - addPoint(data_point, label); + addPoint(data_point, label, -1); return label; } else { - // no need to protect element from additions and updates as // we assume that there are no concurrent operations on deleted element labeltype label_replaced = getExternalLabel(internal_id_replaced); setExternalLabel(internal_id_replaced, label); @@ -891,6 +893,9 @@ class HierarchicalNSW : public AlgorithmInterface { /* * Adds point. Updates the point if it is already in the index + * + * Note: the method is not safe to use to update elements when replacement of deleted elements is enabled, + * bacause elements marked as deleted can be completely removed by addPointToVacantPlace: */ void addPoint(const void *data_point, labeltype label) { // lock all operations with element by label From 2ba0acc8de285d53d76b6a1ad489ea5ede16b55c Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 27 Nov 2022 15:44:31 +0400 Subject: [PATCH 13/27] Update load test --- examples/multiThreadLoad_test.cpp | 109 ++++++++++++++++++++++++++---- hnswlib/hnswalg.h | 4 +- 2 files changed, 98 insertions(+), 15 deletions(-) diff --git a/examples/multiThreadLoad_test.cpp b/examples/multiThreadLoad_test.cpp index f12bf106..a713b2ba 100644 --- a/examples/multiThreadLoad_test.cpp +++ b/examples/multiThreadLoad_test.cpp @@ -1,8 +1,9 @@ #include "../hnswlib/hnswlib.h" #include +#include -int main() { +int main() { std::cout << "Running multithread load test" << std::endl; int d = 16; int max_elements = 1000; @@ -12,17 +13,21 @@ int main() { std::uniform_real_distribution<> distrib_real; hnswlib::L2Space space(d); - hnswlib::HierarchicalNSW* alg_hnsw = new hnswlib::HierarchicalNSW(&space, max_elements); + hnswlib::HierarchicalNSW* alg_hnsw = new hnswlib::HierarchicalNSW(&space, 2 * max_elements); - // with these parameters about 7 threads will do operations with the same label simultaneously + std::cout << "Building index" << std::endl; int num_threads = 40; - int num_ids = 10; + int num_labels = 10; int num_iterations = 10; - int start_id = 0; + int start_label = 0; + // run threads that will add elements to the index + // about 7 threads (the number depends on num_threads and num_labels) + // will add/update element with the same label simultaneously while (true) { - std::uniform_int_distribution<> distrib_int(start_id, start_id + num_ids - 1); + // add elements by batches + std::uniform_int_distribution<> distrib_int(start_label, start_label + num_labels - 1); std::vector threads; for (size_t thread_id = 0; thread_id < num_threads; thread_id++) { threads.push_back( @@ -30,12 +35,11 @@ int main() { [&] { for (int iter = 0; iter < num_iterations; iter++) { std::vector data(d); - int id = distrib_int(rng); - //std::cout << id << std::endl; + hnswlib::labeltype label = distrib_int(rng); for (int i = 0; i < d; i++) { data[i] = distrib_real(rng); } - alg_hnsw->addPoint(data.data(), id); + alg_hnsw->addPoint(data.data(), label); } } ) @@ -44,12 +48,91 @@ int main() { for (auto &thread : threads) { thread.join(); } - //std::cout << alg_hnsw->cur_element_count << std::endl; - if (alg_hnsw->cur_element_count > max_elements - num_ids) { - //std::cout << "Exit" << std::endl; + if (alg_hnsw->cur_element_count > max_elements - num_labels) { break; } - start_id += num_ids; + start_label += num_labels; + } + + // insert remaining elements if needed + for (hnswlib::labeltype label = 0; label < max_elements; label++) { + auto search = alg_hnsw->label_lookup_.find(label); + if (search == alg_hnsw->label_lookup_.end()) { + std::cout << "Adding " << label << std::endl; + std::vector data(d); + for (int i = 0; i < d; i++) { + data[i] = distrib_real(rng); + } + alg_hnsw->addPoint(data.data(), label); + } + } + + std::cout << "Index is created" << std::endl; + + bool stop_threads = false; + std::vector threads; + + // create threads that will do markDeleted and unmarkDeleted of random elements + // each thread works with specific range of labels + std::cout << "Starting markDeleted and unmarkDeleted threads" << std::endl; + num_threads = 20; + int chunk_size = max_elements / num_threads; + for (size_t thread_id = 0; thread_id < num_threads; thread_id++) { + threads.push_back( + std::thread( + [&, thread_id] { + std::uniform_int_distribution<> distrib_int(0, chunk_size - 1); + int start_id = thread_id * chunk_size; + std::vector marked_deleted(chunk_size); + while (!stop_threads) { + int id = distrib_int(rng); + hnswlib::labeltype label = start_id + id; + if (marked_deleted[id]) { + alg_hnsw->unmarkDelete(label); + marked_deleted[id] = false; + } else { + alg_hnsw->markDelete(label); + marked_deleted[id] = true; + } + } + } + ) + ); + } + + // create threads that will add and update random elements + std::cout << "Starting add and update elements threads" << std::endl; + num_threads = 20; + std::uniform_int_distribution<> distrib_int_add(max_elements, 2 * max_elements - 1); + for (size_t thread_id = 0; thread_id < num_threads; thread_id++) { + threads.push_back( + std::thread( + [&] { + std::vector data(d); + while (!stop_threads) { + hnswlib::labeltype label = distrib_int_add(rng); + for (int i = 0; i < d; i++) { + data[i] = distrib_real(rng); + } + alg_hnsw->addPoint(data.data(), label); + std::vector data = alg_hnsw->getDataByLabel(label); + float max_val = *max_element(data.begin(), data.end()); + // never happens but prevents compiler from deleting unused code + if (max_val > 10) { + throw std::runtime_error("Unexpected value in data"); + } + } + } + ) + ); + } + + std::cout << "Sleep and continue operations with index" << std::endl; + int sleep_ms = 60 * 1000; + std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms)); + stop_threads = true; + for (auto &thread : threads) { + thread.join(); } std::cout << "Finish" << std::endl; diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 1f72c323..955a3e04 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -785,7 +785,7 @@ class HierarchicalNSW : public AlgorithmInterface { * Removes the deleted mark of the node, does NOT really change the current graph. * * Note: the method is not safe to use when replacement of deleted elements is enabled, - * bacause elements marked as deleted can be completely removed by addPointToVacantPlace + * because elements marked as deleted can be completely removed by addPointToVacantPlace */ void unmarkDelete(labeltype label) { // lock all operations with element by label @@ -895,7 +895,7 @@ class HierarchicalNSW : public AlgorithmInterface { * Adds point. Updates the point if it is already in the index * * Note: the method is not safe to use to update elements when replacement of deleted elements is enabled, - * bacause elements marked as deleted can be completely removed by addPointToVacantPlace: + * because elements marked as deleted can be completely removed by addPointToVacantPlace: */ void addPoint(const void *data_point, labeltype label) { // lock all operations with element by label From aabd0df49d2dbea129851d7d67f6b394e1d6a7cd Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Tue, 13 Dec 2022 15:09:45 +0400 Subject: [PATCH 14/27] Addressing review comments --- hnswlib/bruteforce.h | 2 +- hnswlib/hnswalg.h | 42 +++------ hnswlib/hnswlib.h | 2 +- python_bindings/bindings.cpp | 88 ++----------------- .../tests/bindings_test_replace.py | 12 +-- .../tests/bindings_test_stress_mt_replace.py | 2 +- 6 files changed, 25 insertions(+), 123 deletions(-) diff --git a/hnswlib/bruteforce.h b/hnswlib/bruteforce.h index 21130090..30b33ae9 100644 --- a/hnswlib/bruteforce.h +++ b/hnswlib/bruteforce.h @@ -61,7 +61,7 @@ class BruteforceSearch : public AlgorithmInterface { } - void addPoint(const void *datapoint, labeltype label) { + void addPoint(const void *datapoint, labeltype label, bool replace_deleted = false) { int idx; { std::unique_lock lock(index_lock); diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 955a3e04..9bd35ffa 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -64,10 +64,10 @@ class HierarchicalNSW : public AlgorithmInterface { mutable std::atomic metric_distance_computations{0}; mutable std::atomic metric_hops{0}; - bool replace_deleted_ = false; + bool replace_deleted_ = false; // flag to replace deleted elements (marked as deleted) during insertions std::mutex deleted_elements_lock; // lock for deleted_elements - std::unordered_set deleted_elements; + std::unordered_set deleted_elements; // contains internal ids of deleted elements HierarchicalNSW(SpaceInterface *s) { @@ -785,7 +785,7 @@ class HierarchicalNSW : public AlgorithmInterface { * Removes the deleted mark of the node, does NOT really change the current graph. * * Note: the method is not safe to use when replacement of deleted elements is enabled, - * because elements marked as deleted can be completely removed by addPointToVacantPlace + * because elements marked as deleted can be completely removed by addPoint */ void unmarkDelete(labeltype label) { // lock all operations with element by label @@ -843,21 +843,19 @@ class HierarchicalNSW : public AlgorithmInterface { /* - * Adds point and replaces previously deleted point if any, updating it with new point - * If deleted point was replaced returns its label, else returns label of added or updated point - * - * Note: - * Methods that can work with deleted elements unmarkDelete and addPoint are not safe to use - * with this method, because addPointToVacantPlace removes deleted elements from the index. + * Adds point. Updates the point if it is already in the index. + * If replacement of deleted elements is enabled: replaces previously deleted point if any, updating it with new point */ - labeltype addPointToVacantPlace(const void* data_point, labeltype label) { + void addPoint(const void *data_point, labeltype label, bool replace_deleted = false) { + if ((replace_deleted_ == false) && (replace_deleted == true)) { + throw std::runtime_error("Replacement of deleted elements is disabled in constructor"); + } + // lock all operations with element by label std::unique_lock lock_label(getLabelOpMutex(label)); - - if (!replace_deleted_) { - throw std::runtime_error("Can't use addPointToVacantPlace when replacement of deleted elements is disabled"); + if (!replace_deleted) { + addPoint(data_point, label, -1); } - // check if there is vacant place tableint internal_id_replaced; std::unique_lock lock_deleted_elements(deleted_elements_lock); @@ -872,7 +870,6 @@ class HierarchicalNSW : public AlgorithmInterface { // else add point to vacant place if (!is_vacant_place) { addPoint(data_point, label, -1); - return label; } else { // we assume that there are no concurrent operations on deleted element labeltype label_replaced = getExternalLabel(internal_id_replaced); @@ -885,25 +882,10 @@ class HierarchicalNSW : public AlgorithmInterface { unmarkDeletedInternal(internal_id_replaced); updatePoint(data_point, internal_id_replaced, 1.0); - - return label_replaced; } } - /* - * Adds point. Updates the point if it is already in the index - * - * Note: the method is not safe to use to update elements when replacement of deleted elements is enabled, - * because elements marked as deleted can be completely removed by addPointToVacantPlace: - */ - void addPoint(const void *data_point, labeltype label) { - // lock all operations with element by label - std::unique_lock lock_label(getLabelOpMutex(label)); - addPoint(data_point, label, -1); - } - - void updatePoint(const void *dataPoint, tableint internalId, float updateNeighborProbability) { // update the feature vector associated with existing point with new vector memcpy(getDataByInternalId(internalId), dataPoint, data_size_); diff --git a/hnswlib/hnswlib.h b/hnswlib/hnswlib.h index 72c955dc..fb7118fa 100644 --- a/hnswlib/hnswlib.h +++ b/hnswlib/hnswlib.h @@ -158,7 +158,7 @@ class SpaceInterface { template class AlgorithmInterface { public: - virtual void addPoint(const void *datapoint, labeltype label) = 0; + virtual void addPoint(const void *datapoint, labeltype label, bool replace_deleted = false) = 0; virtual std::priority_queue> searchKnn(const void*, size_t, BaseFilterFunctor* isIdAllowed = nullptr) const = 0; diff --git a/python_bindings/bindings.cpp b/python_bindings/bindings.cpp index 4ea89329..58e0379d 100644 --- a/python_bindings/bindings.cpp +++ b/python_bindings/bindings.cpp @@ -245,79 +245,7 @@ class Index { } - py::object add_items_to_vacant_place_return_numpy(py::object input, py::object ids_ = py::none(), int num_threads = -1) { - size_t rows, features; - hnswlib::labeltype* data_numpy_l = NULL; - - py::array_t < dist_t, py::array::c_style | py::array::forcecast > items(input); - auto buffer = items.request(); - if (num_threads <= 0) - num_threads = num_threads_default; - get_input_array_shapes(buffer, &rows, &features); - if (features != dim) - throw std::runtime_error("wrong dimensionality of the vectors"); - - // avoid using threads when the number of insertions is small: - if (rows <= num_threads * 4) { - num_threads = 1; - } - - std::vector ids = get_input_ids_and_check_shapes(ids_, rows); - - { - int start = 0; - data_numpy_l = new hnswlib::labeltype[rows]; - - if (!ep_added) { - size_t id = ids.size() ? ids.at(0) : (cur_l); - float* vector_data = (float*)items.data(0); - std::vector norm_array(dim); - if (normalize) { - normalize_vector(vector_data, norm_array.data()); - vector_data = norm_array.data(); - } - hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)vector_data, (size_t)id); - data_numpy_l[start] = label; - start = 1; - ep_added = true; - } - - py::gil_scoped_release l; - if (normalize == false) { - ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) { - size_t id = ids.size() ? ids.at(row) : (cur_l + row); - hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)items.data(row), (size_t)id); - data_numpy_l[row] = label; - }); - } - else { - std::vector norm_array(num_threads * dim); - ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) { - // normalize vector: - size_t start_idx = threadId * dim; - normalize_vector((float*)items.data(row), (norm_array.data() + start_idx)); - - size_t id = ids.size() ? ids.at(row) : (cur_l + row); - hnswlib::labeltype label = appr_alg->addPointToVacantPlace((void*)(norm_array.data() + start_idx), (size_t)id); - data_numpy_l[row] = label; - }); - } - cur_l += rows; - } - - py::capsule free_when_done_l(data_numpy_l, [](void* f) { - delete[] f; - }); - - return py::array_t( - { rows }, // shape - { sizeof(hnswlib::labeltype) }, // C-style contiguous strides for each index - data_numpy_l, // the data pointer - free_when_done_l); - } - - - void addItems(py::object input, py::object ids_ = py::none(), int num_threads = -1) { + void addItems(py::object input, py::object ids_ = py::none(), int num_threads = -1, bool replace_deleted = false) { py::array_t < dist_t, py::array::c_style | py::array::forcecast > items(input); auto buffer = items.request(); if (num_threads <= 0) @@ -346,7 +274,7 @@ class Index { normalize_vector(vector_data, norm_array.data()); vector_data = norm_array.data(); } - appr_alg->addPoint((void*)vector_data, (size_t)id); + appr_alg->addPoint((void*)vector_data, (size_t)id, replace_deleted); start = 1; ep_added = true; } @@ -355,7 +283,7 @@ class Index { if (normalize == false) { ParallelFor(start, rows, num_threads, [&](size_t row, size_t threadId) { size_t id = ids.size() ? ids.at(row) : (cur_l + row); - appr_alg->addPoint((void*)items.data(row), (size_t)id); + appr_alg->addPoint((void*)items.data(row), (size_t)id, replace_deleted); }); } else { std::vector norm_array(num_threads * dim); @@ -365,7 +293,7 @@ class Index { normalize_vector((float*)items.data(row), (norm_array.data() + start_idx)); size_t id = ids.size() ? ids.at(row) : (cur_l + row); - appr_alg->addPoint((void*)(norm_array.data() + start_idx), (size_t)id); + appr_alg->addPoint((void*)(norm_array.data() + start_idx), (size_t)id, replace_deleted); }); } cur_l += rows; @@ -969,12 +897,8 @@ PYBIND11_PLUGIN(hnswlib) { &Index::addItems, py::arg("data"), py::arg("ids") = py::none(), - py::arg("num_threads") = -1) - .def("add_items_to_vacant_place", - &Index::add_items_to_vacant_place_return_numpy, - py::arg("data"), - py::arg("ids") = py::none(), - py::arg("num_threads") = -1) + py::arg("num_threads") = -1, + py::arg("replace_deleted") = false) .def("get_items", &Index::getDataReturnList, py::arg("ids") = py::none()) .def("get_ids_list", &Index::getIdsList) .def("set_ef", &Index::set_ef, py::arg("ef")) diff --git a/python_bindings/tests/bindings_test_replace.py b/python_bindings/tests/bindings_test_replace.py index 63f7280f..7ebb03f4 100644 --- a/python_bindings/tests/bindings_test_replace.py +++ b/python_bindings/tests/bindings_test_replace.py @@ -72,17 +72,13 @@ def testRandomSelf(self): print("Inserting batch 3 by replacing deleted elements") # Maximum number of elements is reached therefore we cannot add new items # but we can replace the deleted ones - labels_replaced = hnsw_index.add_items_to_vacant_place(data3, labels3) - labels2_deleted_list = [l[0] for l in labels2_deleted] - labels_replaced_list = labels_replaced.tolist() - labels2_deleted_list.sort() - labels_replaced_list.sort() - self.assertSequenceEqual(labels2_deleted_list, labels_replaced_list) + hnsw_index.add_items(data3, labels3, replace_deleted=True) # After replacing, all labels should be retrievable print("Checking that remaining labels are in index") # Get remaining data from batch 1 and batch 2 after deletion of elements remaining_labels = set(labels1) | set(labels2) + labels2_deleted_list = [l[0] for l in labels2_deleted] remaining_labels = remaining_labels - set(labels2_deleted_list) remaining_labels_list = list(remaining_labels) comb_data = np.concatenate((data1, data2), axis=0) @@ -114,7 +110,7 @@ def testRandomSelf(self): # Insert batch 4 print("Inserting batch 4 by replacing deleted elements") - labels_replaced = hnsw_index.add_items_to_vacant_place(data4, labels4) + hnsw_index.add_items(data4, labels4, replace_deleted=True) # Check recall print("Checking recall") @@ -133,7 +129,7 @@ def testRandomSelf(self): del hnsw_index # Insert batch 3 print("Inserting batch 3 by replacing deleted elements") - labels_replaced = hnsw_index_pckl.add_items_to_vacant_place(data3, labels3) + hnsw_index_pckl.add_items(data3, labels3, replace_deleted=True) # Check recall print("Checking recall") diff --git a/python_bindings/tests/bindings_test_stress_mt_replace.py b/python_bindings/tests/bindings_test_stress_mt_replace.py index 551c6fe9..21f3d0be 100644 --- a/python_bindings/tests/bindings_test_stress_mt_replace.py +++ b/python_bindings/tests/bindings_test_stress_mt_replace.py @@ -58,5 +58,5 @@ def testRandomSelf(self): # Replace deleted elements # Maximum number of elements is reached therefore we cannot add new items # but we can replace the deleted ones - labels_replaced = hnsw_index.add_items_to_vacant_place(data3, labels3) + labels_replaced = hnsw_index.add_items(data3, labels3, replace_deleted=True) \ No newline at end of file From c26a45b0237afb02aef17a0a9d075e284e579d06 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Tue, 13 Dec 2022 15:17:10 +0400 Subject: [PATCH 15/27] Update python 3.6 version in actions to meet Ubuntu 22.04 --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 153ebdb2..4951e984 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -8,7 +8,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, windows-latest] - python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"] + python-version: ["3.6.15", "3.7", "3.8", "3.9", "3.10"] steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 From aaf5b5db448a338075d48aacd75651329471afe3 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Tue, 13 Dec 2022 15:20:36 +0400 Subject: [PATCH 16/27] Remove python 3.6 tests as it is not available in Ubuntu 22.04 --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 4951e984..830631fd 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -8,7 +8,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, windows-latest] - python-version: ["3.6.15", "3.7", "3.8", "3.9", "3.10"] + python-version: ["3.7", "3.8", "3.9", "3.10"] steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 From 01bd9d08cfa3bb5570fea5ab186e6effefa30849 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 17 Dec 2022 14:56:53 +0400 Subject: [PATCH 17/27] Refactoring --- hnswlib/hnswalg.h | 20 ++++++++-------- python_bindings/bindings.cpp | 24 +++++++++---------- .../tests/bindings_test_replace.py | 4 ++-- .../tests/bindings_test_stress_mt_replace.py | 2 +- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 9bd35ffa..70d9a5de 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -64,7 +64,7 @@ class HierarchicalNSW : public AlgorithmInterface { mutable std::atomic metric_distance_computations{0}; mutable std::atomic metric_hops{0}; - bool replace_deleted_ = false; // flag to replace deleted elements (marked as deleted) during insertions + bool allow_replace_deleted_ = false; // flag to replace deleted elements (marked as deleted) during insertions std::mutex deleted_elements_lock; // lock for deleted_elements std::unordered_set deleted_elements; // contains internal ids of deleted elements @@ -79,8 +79,8 @@ class HierarchicalNSW : public AlgorithmInterface { const std::string &location, bool nmslib = false, size_t max_elements = 0, - bool replace_deleted = false) - : replace_deleted_(replace_deleted) { + bool allow_replace_deleted = false) + : allow_replace_deleted_(allow_replace_deleted) { loadIndex(location, s, max_elements); } @@ -91,11 +91,11 @@ class HierarchicalNSW : public AlgorithmInterface { size_t M = 16, size_t ef_construction = 200, size_t random_seed = 100, - bool replace_deleted = false) + bool allow_replace_deleted = false) : link_list_locks_(max_elements), label_op_locks_(MAX_LABEL_OPERATION_LOCKS), element_levels_(max_elements), - replace_deleted_(replace_deleted) { + allow_replace_deleted_(allow_replace_deleted) { max_elements_ = max_elements; num_deleted_ = 0; data_size_ = s->get_data_size(); @@ -707,7 +707,7 @@ class HierarchicalNSW : public AlgorithmInterface { for (size_t i = 0; i < cur_element_count; i++) { if (isMarkedDeleted(i)) { num_deleted_ += 1; - if (replace_deleted_) deleted_elements.insert(i); + if (allow_replace_deleted_) deleted_elements.insert(i); } } @@ -771,7 +771,7 @@ class HierarchicalNSW : public AlgorithmInterface { unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId))+2; *ll_cur |= DELETE_MARK; num_deleted_ += 1; - if (replace_deleted_) { + if (allow_replace_deleted_) { std::unique_lock lock_deleted_elements(deleted_elements_lock); deleted_elements.insert(internalId); } @@ -813,7 +813,7 @@ class HierarchicalNSW : public AlgorithmInterface { unsigned char *ll_cur = ((unsigned char *)get_linklist0(internalId)) + 2; *ll_cur &= ~DELETE_MARK; num_deleted_ -= 1; - if (replace_deleted_) { + if (allow_replace_deleted_) { std::unique_lock lock_deleted_elements(deleted_elements_lock); deleted_elements.erase(internalId); } @@ -847,7 +847,7 @@ class HierarchicalNSW : public AlgorithmInterface { * If replacement of deleted elements is enabled: replaces previously deleted point if any, updating it with new point */ void addPoint(const void *data_point, labeltype label, bool replace_deleted = false) { - if ((replace_deleted_ == false) && (replace_deleted == true)) { + if ((allow_replace_deleted_ == false) && (replace_deleted == true)) { throw std::runtime_error("Replacement of deleted elements is disabled in constructor"); } @@ -1053,7 +1053,7 @@ class HierarchicalNSW : public AlgorithmInterface { auto search = label_lookup_.find(label); if (search != label_lookup_.end()) { tableint existingInternalId = search->second; - if (replace_deleted_) { + if (allow_replace_deleted_) { if (isMarkedDeleted(existingInternalId)) { throw std::runtime_error("Can't use addPoint to update deleted elements if replacement of deleted elements is enabled."); } diff --git a/python_bindings/bindings.cpp b/python_bindings/bindings.cpp index 58e0379d..3196a228 100644 --- a/python_bindings/bindings.cpp +++ b/python_bindings/bindings.cpp @@ -194,12 +194,12 @@ class Index { size_t M, size_t efConstruction, size_t random_seed, - bool replace_deleted) { + bool allow_replace_deleted) { if (appr_alg) { throw std::runtime_error("The index is already initiated."); } cur_l = 0; - appr_alg = new hnswlib::HierarchicalNSW(l2space, maxElements, M, efConstruction, random_seed, replace_deleted); + appr_alg = new hnswlib::HierarchicalNSW(l2space, maxElements, M, efConstruction, random_seed, allow_replace_deleted); index_inited = true; ep_added = false; appr_alg->ef_ = default_ef; @@ -224,12 +224,12 @@ class Index { } - void loadIndex(const std::string &path_to_index, size_t max_elements, bool replace_deleted) { + void loadIndex(const std::string &path_to_index, size_t max_elements, bool allow_replace_deleted) { if (appr_alg) { std::cerr << "Warning: Calling load_index for an already inited index. Old index is being deallocated." << std::endl; delete appr_alg; } - appr_alg = new hnswlib::HierarchicalNSW(l2space, path_to_index, false, max_elements, replace_deleted); + appr_alg = new hnswlib::HierarchicalNSW(l2space, path_to_index, false, max_elements, allow_replace_deleted); cur_l = appr_alg->cur_element_count; index_inited = true; } @@ -415,7 +415,7 @@ class Index { "ef"_a = appr_alg->ef_, "has_deletions"_a = (bool)appr_alg->num_deleted_, "size_links_per_element"_a = appr_alg->size_links_per_element_, - "replace_deleted"_a = appr_alg->replace_deleted_, + "allow_replace_deleted"_a = appr_alg->allow_replace_deleted_, "label_lookup_external"_a = py::array_t( { appr_alg->label_lookup_.size() }, // shape @@ -578,11 +578,11 @@ class Index { } // process deleted elements - bool replace_deleted = false; - if (d.contains("replace_deleted")) { - replace_deleted = d["replace_deleted"].cast(); + bool allow_replace_deleted = false; + if (d.contains("allow_replace_deleted")) { + allow_replace_deleted = d["allow_replace_deleted"].cast(); } - appr_alg->replace_deleted_= replace_deleted; + appr_alg->allow_replace_deleted_= allow_replace_deleted; appr_alg->num_deleted_ = 0; bool has_deletions = d["has_deletions"].cast(); @@ -590,7 +590,7 @@ class Index { for (size_t i = 0; i < appr_alg->cur_element_count; i++) { if (appr_alg->isMarkedDeleted(i)) { appr_alg->num_deleted_ += 1; - if (replace_deleted) appr_alg->deleted_elements.insert(i); + if (allow_replace_deleted) appr_alg->deleted_elements.insert(i); } } } @@ -886,7 +886,7 @@ PYBIND11_PLUGIN(hnswlib) { py::arg("M") = 16, py::arg("ef_construction") = 200, py::arg("random_seed") = 100, - py::arg("replace_deleted") = false) + py::arg("allow_replace_deleted") = false) .def("knn_query", &Index::knnQuery_return_numpy, py::arg("data"), @@ -908,7 +908,7 @@ PYBIND11_PLUGIN(hnswlib) { &Index::loadIndex, py::arg("path_to_index"), py::arg("max_elements") = 0, - py::arg("replace_deleted") = false) + py::arg("allow_replace_deleted") = false) .def("mark_deleted", &Index::markDeleted, py::arg("label")) .def("unmark_deleted", &Index::unmarkDeleted, py::arg("label")) .def("resize_index", &Index::resizeIndex, py::arg("new_size")) diff --git a/python_bindings/tests/bindings_test_replace.py b/python_bindings/tests/bindings_test_replace.py index 7ebb03f4..ac92b10b 100644 --- a/python_bindings/tests/bindings_test_replace.py +++ b/python_bindings/tests/bindings_test_replace.py @@ -40,7 +40,7 @@ def testRandomSelf(self): # Declaring index hnsw_index = hnswlib.Index(space='l2', dim=dim) - hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, replace_deleted=True) + hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, allow_replace_deleted=True) hnsw_index.set_ef(100) hnsw_index.set_num_threads(4) @@ -106,7 +106,7 @@ def testRandomSelf(self): hnsw_index = hnswlib.Index(space='l2', dim=dim) # the space can be changed - keeps the data, alters the distance function. hnsw_index.set_num_threads(4) print(f"Loading index from {index_path}") - hnsw_index.load_index(index_path, max_elements=max_num_elements, replace_deleted=True) + hnsw_index.load_index(index_path, max_elements=max_num_elements, allow_replace_deleted=True) # Insert batch 4 print("Inserting batch 4 by replacing deleted elements") diff --git a/python_bindings/tests/bindings_test_stress_mt_replace.py b/python_bindings/tests/bindings_test_stress_mt_replace.py index 21f3d0be..1a92ebc5 100644 --- a/python_bindings/tests/bindings_test_stress_mt_replace.py +++ b/python_bindings/tests/bindings_test_stress_mt_replace.py @@ -31,7 +31,7 @@ def testRandomSelf(self): # Declaring index for _ in range(100): hnsw_index = hnswlib.Index(space='l2', dim=dim) - hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, replace_deleted=True) + hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, allow_replace_deleted=True) hnsw_index.set_ef(100) hnsw_index.set_num_threads(50) From 50bac85b1db75812baebf518683f4108c2f4de74 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 18 Dec 2022 15:50:23 +0400 Subject: [PATCH 18/27] Fix code and tests --- hnswlib/hnswalg.h | 1 + python_bindings/tests/bindings_test_labels.py | 25 +++++----- .../tests/bindings_test_replace.py | 47 +++++++++++-------- .../tests/bindings_test_stress_mt_replace.py | 18 ++++--- 4 files changed, 53 insertions(+), 38 deletions(-) diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 70d9a5de..69997bd4 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -855,6 +855,7 @@ class HierarchicalNSW : public AlgorithmInterface { std::unique_lock lock_label(getLabelOpMutex(label)); if (!replace_deleted) { addPoint(data_point, label, -1); + return; } // check if there is vacant place tableint internal_id_replaced; diff --git a/python_bindings/tests/bindings_test_labels.py b/python_bindings/tests/bindings_test_labels.py index 2b091371..524a24d5 100644 --- a/python_bindings/tests/bindings_test_labels.py +++ b/python_bindings/tests/bindings_test_labels.py @@ -95,19 +95,20 @@ def testRandomSelf(self): # Delete data1 labels1_deleted, _ = p.knn_query(data1, k=1) - - for l in labels1_deleted: - p.mark_deleted(l[0]) + # delete probable duplicates from nearest neighbors + labels1_deleted_no_dup = set(labels1_deleted.flatten()) + for l in labels1_deleted_no_dup: + p.mark_deleted(l) labels2, _ = p.knn_query(data2, k=1) items = p.get_items(labels2) diff_with_gt_labels = np.mean(np.abs(data2-items)) - self.assertAlmostEqual(diff_with_gt_labels, 0, delta=1e-3) # console + self.assertAlmostEqual(diff_with_gt_labels, 0, delta=1e-3) labels1_after, _ = p.knn_query(data1, k=1) for la in labels1_after: - for lb in labels1_deleted: - if la[0] == lb[0]: - self.assertTrue(False) + if la[0] in labels1_deleted_no_dup: + print(f"Found deleted label {la[0]} during knn search") + self.assertTrue(False) print("All the data in data1 are removed") # Checking saving/loading index with elements marked as deleted @@ -119,13 +120,13 @@ def testRandomSelf(self): labels1_after, _ = p.knn_query(data1, k=1) for la in labels1_after: - for lb in labels1_deleted: - if la[0] == lb[0]: - self.assertTrue(False) + if la[0] in labels1_deleted_no_dup: + print(f"Found deleted label {la[0]} during knn search after index loading") + self.assertTrue(False) # Unmark deleted data - for l in labels1_deleted: - p.unmark_deleted(l[0]) + for l in labels1_deleted_no_dup: + p.unmark_deleted(l) labels_restored, _ = p.knn_query(data1, k=1) self.assertAlmostEqual(np.mean(labels_restored.reshape(-1) == np.arange(len(data1))), 1.0, 3) print("All the data in data1 are restored") diff --git a/python_bindings/tests/bindings_test_replace.py b/python_bindings/tests/bindings_test_replace.py index ac92b10b..754ad668 100644 --- a/python_bindings/tests/bindings_test_replace.py +++ b/python_bindings/tests/bindings_test_replace.py @@ -54,32 +54,37 @@ def testRandomSelf(self): # Delete nearest neighbors of batch 2 print("Deleting neighbors of batch 2") labels2_deleted, _ = hnsw_index.knn_query(data2, k=1) - for l in labels2_deleted: - hnsw_index.mark_deleted(l[0]) + # delete probable duplicates from nearest neighbors + labels2_deleted_no_dup = set(labels2_deleted.flatten()) + num_duplicates = len(labels2_deleted) - len(labels2_deleted_no_dup) + for l in labels2_deleted_no_dup: + hnsw_index.mark_deleted(l) labels1_found, _ = hnsw_index.knn_query(data1, k=1) items = hnsw_index.get_items(labels1_found) - diff_with_gt_labels = np.mean(np.abs(data1-items)) + diff_with_gt_labels = np.mean(np.abs(data1 - items)) self.assertAlmostEqual(diff_with_gt_labels, 0, delta=1e-3) labels2_after, _ = hnsw_index.knn_query(data2, k=1) for la in labels2_after: - for lb in labels2_deleted: - if la[0] == lb[0]: - self.assertTrue(False) + if la[0] in labels2_deleted_no_dup: + print(f"Found deleted label {la[0]} during knn search") + self.assertTrue(False) print("All the neighbors of data2 are removed") # Replace deleted elements print("Inserting batch 3 by replacing deleted elements") # Maximum number of elements is reached therefore we cannot add new items # but we can replace the deleted ones - hnsw_index.add_items(data3, labels3, replace_deleted=True) + # Note: there may be less than num_elements elements. + # As we could delete less than num_elements because of duplicates + labels3_tr = labels3[0:labels3.shape[0] - num_duplicates] + data3_tr = data3[0:data3.shape[0] - num_duplicates] + hnsw_index.add_items(data3_tr, labels3_tr, replace_deleted=True) # After replacing, all labels should be retrievable print("Checking that remaining labels are in index") # Get remaining data from batch 1 and batch 2 after deletion of elements - remaining_labels = set(labels1) | set(labels2) - labels2_deleted_list = [l[0] for l in labels2_deleted] - remaining_labels = remaining_labels - set(labels2_deleted_list) + remaining_labels = (set(labels1) | set(labels2)) - labels2_deleted_no_dup remaining_labels_list = list(remaining_labels) comb_data = np.concatenate((data1, data2), axis=0) remaining_data = comb_data[remaining_labels_list] @@ -87,13 +92,13 @@ def testRandomSelf(self): returned_items = hnsw_index.get_items(remaining_labels_list) self.assertSequenceEqual(remaining_data.tolist(), returned_items) - returned_items = hnsw_index.get_items(labels3) - self.assertSequenceEqual(data3.tolist(), returned_items) + returned_items = hnsw_index.get_items(labels3_tr) + self.assertSequenceEqual(data3_tr.tolist(), returned_items) # Check index serialization # Delete batch 3 print("Deleting batch 3") - for l in labels3: + for l in labels3_tr: hnsw_index.mark_deleted(l) # Save index @@ -110,18 +115,20 @@ def testRandomSelf(self): # Insert batch 4 print("Inserting batch 4 by replacing deleted elements") - hnsw_index.add_items(data4, labels4, replace_deleted=True) + labels4_tr = labels4[0:labels4.shape[0] - num_duplicates] + data4_tr = data4[0:data4.shape[0] - num_duplicates] + hnsw_index.add_items(data4_tr, labels4_tr, replace_deleted=True) # Check recall print("Checking recall") - labels_found, _ = hnsw_index.knn_query(data4, k=1) - recall = np.mean(labels_found.reshape(-1) == labels4) + labels_found, _ = hnsw_index.knn_query(data4_tr, k=1) + recall = np.mean(labels_found.reshape(-1) == labels4_tr) print(f"Recall for the 4 batch: {recall}") self.assertGreater(recall, recall_threshold) # Delete batch 4 print("Deleting batch 4") - for l in labels4: + for l in labels4_tr: hnsw_index.mark_deleted(l) print("Testing pickle serialization") @@ -129,12 +136,12 @@ def testRandomSelf(self): del hnsw_index # Insert batch 3 print("Inserting batch 3 by replacing deleted elements") - hnsw_index_pckl.add_items(data3, labels3, replace_deleted=True) + hnsw_index_pckl.add_items(data3_tr, labels3_tr, replace_deleted=True) # Check recall print("Checking recall") - labels_found, _ = hnsw_index_pckl.knn_query(data3, k=1) - recall = np.mean(labels_found.reshape(-1) == labels3) + labels_found, _ = hnsw_index_pckl.knn_query(data3_tr, k=1) + recall = np.mean(labels_found.reshape(-1) == labels3_tr) print(f"Recall for the 3 batch: {recall}") self.assertGreater(recall, recall_threshold) diff --git a/python_bindings/tests/bindings_test_stress_mt_replace.py b/python_bindings/tests/bindings_test_stress_mt_replace.py index 1a92ebc5..0c56c595 100644 --- a/python_bindings/tests/bindings_test_stress_mt_replace.py +++ b/python_bindings/tests/bindings_test_stress_mt_replace.py @@ -42,21 +42,27 @@ def testRandomSelf(self): # Delete nearest neighbors of batch 2 labels2_deleted, _ = hnsw_index.knn_query(data2, k=1) - for l in labels2_deleted: - hnsw_index.mark_deleted(l[0]) + labels2_deleted_flat = labels2_deleted.flatten() + # delete probable duplicates from nearest neighbors + labels2_deleted_no_dup = set(labels2_deleted_flat) + for l in labels2_deleted_no_dup: + hnsw_index.mark_deleted(l) labels1_found, _ = hnsw_index.knn_query(data1, k=1) items = hnsw_index.get_items(labels1_found) - diff_with_gt_labels = np.mean(np.abs(data1-items)) + diff_with_gt_labels = np.mean(np.abs(data1 - items)) self.assertAlmostEqual(diff_with_gt_labels, 0, delta=1e-3) labels2_after, _ = hnsw_index.knn_query(data2, k=1) labels2_after_flat = labels2_after.flatten() - labels2_deleted_flat = labels2_deleted.flatten() common = np.intersect1d(labels2_after_flat, labels2_deleted_flat) self.assertTrue(common.size == 0) # Replace deleted elements # Maximum number of elements is reached therefore we cannot add new items # but we can replace the deleted ones - labels_replaced = hnsw_index.add_items(data3, labels3, replace_deleted=True) - \ No newline at end of file + # Note: there may be less than num_elements elements. + # As we could delete less than num_elements because of duplicates + num_duplicates = len(labels2_deleted) - len(labels2_deleted_no_dup) + labels3_tr = labels3[0:labels3.shape[0] - num_duplicates] + data3_tr = data3[0:data3.shape[0] - num_duplicates] + hnsw_index.add_items(data3_tr, labels3_tr, replace_deleted=True) From 2711a61904d8a4816f793f16cbd132de55fbbd36 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 18 Dec 2022 19:54:19 +0400 Subject: [PATCH 19/27] Multithread test to check udate of deleted elements --- .github/workflows/build.yml | 1 + CMakeLists.txt | 5 +- examples/multiThread_replace_test.cpp | 118 ++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 examples/multiThread_replace_test.cpp diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 830631fd..e86d2545 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -62,6 +62,7 @@ jobs: ./searchKnnCloserFirst_test ./searchKnnWithFilter_test ./multiThreadLoad_test + ./multiThread_replace_test ./test_updates ./test_updates update shell: bash diff --git a/CMakeLists.txt b/CMakeLists.txt index 932a2872..45d117ae 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) set(CMAKE_CXX_STANDARD 11) if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - SET( CMAKE_CXX_FLAGS "-Ofast -DNDEBUG -std=c++11 -DHAVE_CXX0X -openmp -march=native -fpic -ftree-vectorize") + SET( CMAKE_CXX_FLAGS "-Ofast -DNDEBUG -std=c++11 -DHAVE_CXX0X -openmp -mcpu=apple-m1 -fpic -ftree-vectorize") elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") SET( CMAKE_CXX_FLAGS "-Ofast -lrt -DNDEBUG -std=c++11 -DHAVE_CXX0X -march=native -fpic -w -fopenmp -ftree-vectorize -ftree-vectorizer-verbose=0" ) elseif (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") @@ -28,6 +28,9 @@ if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) add_executable(multiThreadLoad_test examples/multiThreadLoad_test.cpp) target_link_libraries(multiThreadLoad_test hnswlib) + add_executable(multiThread_replace_test examples/multiThread_replace_test.cpp) + target_link_libraries(multiThread_replace_test hnswlib) + add_executable(main main.cpp sift_1b.cpp) target_link_libraries(main hnswlib) endif() diff --git a/examples/multiThread_replace_test.cpp b/examples/multiThread_replace_test.cpp new file mode 100644 index 00000000..192718b8 --- /dev/null +++ b/examples/multiThread_replace_test.cpp @@ -0,0 +1,118 @@ +#include "../hnswlib/hnswlib.h" +#include +#include + + +template +inline void ParallelFor(size_t start, size_t end, size_t numThreads, Function fn) { + if (numThreads <= 0) { + numThreads = std::thread::hardware_concurrency(); + } + + if (numThreads == 1) { + for (size_t id = start; id < end; id++) { + fn(id, 0); + } + } else { + std::vector threads; + std::atomic current(start); + + // keep track of exceptions in threads + // https://stackoverflow.com/a/32428427/1713196 + std::exception_ptr lastException = nullptr; + std::mutex lastExceptMutex; + + for (size_t threadId = 0; threadId < numThreads; ++threadId) { + threads.push_back(std::thread([&, threadId] { + while (true) { + size_t id = current.fetch_add(1); + + if (id >= end) { + break; + } + + try { + fn(id, threadId); + } catch (...) { + std::unique_lock lastExcepLock(lastExceptMutex); + lastException = std::current_exception(); + /* + * This will work even when current is the largest value that + * size_t can fit, because fetch_add returns the previous value + * before the increment (what will result in overflow + * and produce 0 instead of current + 1). + */ + current = end; + break; + } + } + })); + } + for (auto &thread : threads) { + thread.join(); + } + if (lastException) { + std::rethrow_exception(lastException); + } + } +} + + +int main() { + std::cout << "Running multithread load test" << std::endl; + int d = 16; + int num_elements = 1000; + int max_elements = 2 * num_elements; + int num_threads = 50; + + std::mt19937 rng; + rng.seed(47); + std::uniform_real_distribution<> distrib_real; + + hnswlib::L2Space space(d); + + float batch1[d * max_elements]; + for (int i = 0; i < d * max_elements; i++) { + batch1[i] = distrib_real(rng); + } + float batch2[d * num_elements]; + for (int i = 0; i < d * num_elements; i++) { + batch2[i] = distrib_real(rng); + } + + std::vector rand_labels(max_elements); + for (int i = 0; i < max_elements; i++) { + rand_labels[i] = i; + } + std::shuffle(rand_labels.begin(), rand_labels.end(), rng); + + int iter = 0; + while (iter < 1000) { + std::cout << iter << std::endl; + + hnswlib::HierarchicalNSW* alg_hnsw = new hnswlib::HierarchicalNSW(&space, max_elements, 16, 200, 123, true); + + std::cout << "Building index\n"; + ParallelFor(0, max_elements, num_threads, [&](size_t row, size_t threadId) { + alg_hnsw->addPoint((void*)(batch1 + d * row), row); + }); + + std::cout << "Deleting\n"; + for (int i = 0; i < num_elements; i++) { + alg_hnsw->markDelete(rand_labels[i]); + } + + std::cout << "Updating elements\n"; + ParallelFor(0, num_elements, num_threads, [&](size_t row, size_t threadId) { + int label = rand_labels[row] + 10000; + alg_hnsw->addPoint((void*)(batch2 + d * row), label, true); + }); + + iter += 1; + + delete alg_hnsw; + } + + std::cout << "Finish" << std::endl; + return 0; +} From a31bd4ee912b29d3ae19451b1633bd6f69e0b26e Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 18 Dec 2022 20:03:20 +0400 Subject: [PATCH 20/27] Refactoring --- examples/multiThread_replace_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/multiThread_replace_test.cpp b/examples/multiThread_replace_test.cpp index 192718b8..618ce3dd 100644 --- a/examples/multiThread_replace_test.cpp +++ b/examples/multiThread_replace_test.cpp @@ -104,7 +104,7 @@ int main() { std::cout << "Updating elements\n"; ParallelFor(0, num_elements, num_threads, [&](size_t row, size_t threadId) { - int label = rand_labels[row] + 10000; + int label = rand_labels[row] + max_elements; alg_hnsw->addPoint((void*)(batch2 + d * row), label, true); }); From 61ad82570d2c935d40c027dfbf1f91a8ebafccd5 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 18 Dec 2022 20:15:39 +0400 Subject: [PATCH 21/27] Fix compile issues --- examples/multiThread_replace_test.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/examples/multiThread_replace_test.cpp b/examples/multiThread_replace_test.cpp index 618ce3dd..7d039692 100644 --- a/examples/multiThread_replace_test.cpp +++ b/examples/multiThread_replace_test.cpp @@ -71,11 +71,11 @@ int main() { hnswlib::L2Space space(d); - float batch1[d * max_elements]; + float* batch1 = new float[d * max_elements]; for (int i = 0; i < d * max_elements; i++) { batch1[i] = distrib_real(rng); } - float batch2[d * num_elements]; + float* batch2 = new float[d * num_elements]; for (int i = 0; i < d * num_elements; i++) { batch2[i] = distrib_real(rng); } @@ -114,5 +114,8 @@ int main() { } std::cout << "Finish" << std::endl; + + delete[] batch1; + delete[] batch2; return 0; } From d36fe803c184ea597f41f0be27b9c3d9b5167678 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 7 Jan 2023 23:00:24 +0400 Subject: [PATCH 22/27] Fix update of elements --- examples/multiThread_replace_test.cpp | 5 +++++ hnswlib/hnswalg.h | 4 ++++ 2 files changed, 9 insertions(+) diff --git a/examples/multiThread_replace_test.cpp b/examples/multiThread_replace_test.cpp index 7d039692..3f126cde 100644 --- a/examples/multiThread_replace_test.cpp +++ b/examples/multiThread_replace_test.cpp @@ -71,6 +71,7 @@ int main() { hnswlib::L2Space space(d); + // generate batch1 and batch2 data float* batch1 = new float[d * max_elements]; for (int i = 0; i < d * max_elements; i++) { batch1[i] = distrib_real(rng); @@ -80,6 +81,7 @@ int main() { batch2[i] = distrib_real(rng); } + // generate random labels to delete them from index std::vector rand_labels(max_elements); for (int i = 0; i < max_elements; i++) { rand_labels[i] = i; @@ -92,16 +94,19 @@ int main() { hnswlib::HierarchicalNSW* alg_hnsw = new hnswlib::HierarchicalNSW(&space, max_elements, 16, 200, 123, true); + // add batch1 data std::cout << "Building index\n"; ParallelFor(0, max_elements, num_threads, [&](size_t row, size_t threadId) { alg_hnsw->addPoint((void*)(batch1 + d * row), row); }); + // delete batch1 data std::cout << "Deleting\n"; for (int i = 0; i < num_elements; i++) { alg_hnsw->markDelete(rand_labels[i]); } + // replace batch1 data with batch2 data std::cout << "Updating elements\n"; ParallelFor(0, num_elements, num_threads, [&](size_t row, size_t threadId) { int label = rand_labels[row] + max_elements; diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index 69997bd4..d7487096 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -451,6 +451,10 @@ class HierarchicalNSW : public AlgorithmInterface { tableint next_closest_entry_point = selectedNeighbors.back(); { + std::unique_lock lock(link_list_locks_[cur_c], std::defer_lock); + if (isUpdate) { + lock.lock(); + } linklistsizeint *ll_cur; if (level == 0) ll_cur = get_linklist0(cur_c); From a188fce273317a8b464976ed69612ec1e967625d Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sat, 7 Jan 2023 23:08:43 +0400 Subject: [PATCH 23/27] Update test params --- examples/multiThread_replace_test.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/multiThread_replace_test.cpp b/examples/multiThread_replace_test.cpp index 3f126cde..80a29ad8 100644 --- a/examples/multiThread_replace_test.cpp +++ b/examples/multiThread_replace_test.cpp @@ -89,7 +89,7 @@ int main() { std::shuffle(rand_labels.begin(), rand_labels.end(), rng); int iter = 0; - while (iter < 1000) { + while (iter < 200) { std::cout << iter << std::endl; hnswlib::HierarchicalNSW* alg_hnsw = new hnswlib::HierarchicalNSW(&space, max_elements, 16, 200, 123, true); From 5b67a3877013a2d61fe67061ccd9168d51531222 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 8 Jan 2023 12:38:43 +0400 Subject: [PATCH 24/27] Update CMakeLists.txt fix cmake --- CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 45d117ae..de951171 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,7 +9,7 @@ if(CMAKE_PROJECT_NAME STREQUAL PROJECT_NAME) set(CMAKE_CXX_STANDARD 11) if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - SET( CMAKE_CXX_FLAGS "-Ofast -DNDEBUG -std=c++11 -DHAVE_CXX0X -openmp -mcpu=apple-m1 -fpic -ftree-vectorize") + SET( CMAKE_CXX_FLAGS "-Ofast -DNDEBUG -std=c++11 -DHAVE_CXX0X -openmp -march=native -fpic -ftree-vectorize") elseif (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") SET( CMAKE_CXX_FLAGS "-Ofast -lrt -DNDEBUG -std=c++11 -DHAVE_CXX0X -march=native -fpic -w -fopenmp -ftree-vectorize -ftree-vectorizer-verbose=0" ) elseif (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") From e4e97bf663906a3de35eee8877031efed9ca6034 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 8 Jan 2023 13:34:51 +0400 Subject: [PATCH 25/27] Add test to check recall with replaced elements --- examples/multiThread_replace_test.cpp | 4 +- .../tests/bindings_test_replace.py | 97 +++++++++++++++++++ 2 files changed, 99 insertions(+), 2 deletions(-) diff --git a/examples/multiThread_replace_test.cpp b/examples/multiThread_replace_test.cpp index 80a29ad8..d5c68e24 100644 --- a/examples/multiThread_replace_test.cpp +++ b/examples/multiThread_replace_test.cpp @@ -100,13 +100,13 @@ int main() { alg_hnsw->addPoint((void*)(batch1 + d * row), row); }); - // delete batch1 data + // delete half random elements of batch1 data std::cout << "Deleting\n"; for (int i = 0; i < num_elements; i++) { alg_hnsw->markDelete(rand_labels[i]); } - // replace batch1 data with batch2 data + // replace deleted elements with batch2 data std::cout << "Updating elements\n"; ParallelFor(0, num_elements, num_threads, [&](size_t row, size_t threadId) { int label = rand_labels[row] + max_elements; diff --git a/python_bindings/tests/bindings_test_replace.py b/python_bindings/tests/bindings_test_replace.py index 754ad668..80003a3a 100644 --- a/python_bindings/tests/bindings_test_replace.py +++ b/python_bindings/tests/bindings_test_replace.py @@ -9,6 +9,10 @@ class RandomSelfTestCase(unittest.TestCase): def testRandomSelf(self): + """ + Tests if replace of deleted elements works correctly + Tests serialization of the index with replaced elements + """ dim = 16 num_elements = 5000 max_num_elements = 2 * num_elements @@ -146,3 +150,96 @@ def testRandomSelf(self): self.assertGreater(recall, recall_threshold) os.remove(index_path) + + + def test_recall_degradation(self): + """ + Compares recall of the index with replaced elements and without + Measures recall degradation + """ + dim = 16 + num_elements = 10_000 + max_num_elements = 2 * num_elements + query_size = 1_000 + k = 100 + + recall_threshold = 0.98 + max_recall_diff = 0.02 + + # Generating sample data + print("Generating data") + # batch 1 + first_id = 0 + last_id = num_elements + labels1 = np.arange(first_id, last_id) + data1 = np.float32(np.random.random((num_elements, dim))) + # batch 2 + first_id += num_elements + last_id += num_elements + labels2 = np.arange(first_id, last_id) + data2 = np.float32(np.random.random((num_elements, dim))) + # batch 3 + first_id += num_elements + last_id += num_elements + labels3 = np.arange(first_id, last_id) + data3 = np.float32(np.random.random((num_elements, dim))) + # query to test recall + query_data = np.float32(np.random.random((query_size, dim))) + + # Declaring index + hnsw_index_no_replace = hnswlib.Index(space='l2', dim=dim) + hnsw_index_no_replace.init_index(max_elements=max_num_elements, ef_construction=200, M=16, allow_replace_deleted=False) + hnsw_index_with_replace = hnswlib.Index(space='l2', dim=dim) + hnsw_index_with_replace.init_index(max_elements=max_num_elements, ef_construction=200, M=16, allow_replace_deleted=True) + + bf_index = hnswlib.BFIndex(space='l2', dim=dim) + bf_index.init_index(max_elements=max_num_elements) + + hnsw_index_no_replace.set_ef(100) + hnsw_index_no_replace.set_num_threads(50) + hnsw_index_with_replace.set_ef(100) + hnsw_index_with_replace.set_num_threads(50) + + # Add data + print("Adding data") + hnsw_index_with_replace.add_items(data1, labels1) + hnsw_index_with_replace.add_items(data2, labels2) # maximum number of elements is reached + bf_index.add_items(data1, labels1) + bf_index.add_items(data3, labels3) # maximum number of elements is reached + + for l in labels2: + hnsw_index_with_replace.mark_deleted(l) + hnsw_index_with_replace.add_items(data3, labels3, replace_deleted=True) + + hnsw_index_no_replace.add_items(data1, labels1) + hnsw_index_no_replace.add_items(data3, labels3) # maximum number of elements is reached + + # Query the elements and measure recall: + labels_hnsw_with_replace, _ = hnsw_index_with_replace.knn_query(query_data, k) + labels_hnsw_no_replace, _ = hnsw_index_no_replace.knn_query(query_data, k) + labels_bf, distances_bf = bf_index.knn_query(query_data, k) + + # Measure recall + correct_with_replace = 0 + correct_no_replace = 0 + for i in range(query_size): + for label in labels_hnsw_with_replace[i]: + for correct_label in labels_bf[i]: + if label == correct_label: + correct_with_replace += 1 + break + for label in labels_hnsw_no_replace[i]: + for correct_label in labels_bf[i]: + if label == correct_label: + correct_no_replace += 1 + break + + recall_with_replace = float(correct_with_replace) / (k*query_size) + recall_no_replace = float(correct_no_replace) / (k*query_size) + print("recall with replace:", recall_with_replace) + print("recall without replace:", recall_no_replace) + + recall_diff = abs(recall_with_replace - recall_with_replace) + + self.assertGreater(recall_no_replace, recall_threshold) + self.assertLess(recall_diff, max_recall_diff) From 646bfda3447b277c00ece3262ddf6d4696d4d8c2 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Sun, 8 Jan 2023 13:52:32 +0400 Subject: [PATCH 26/27] Refactoring of test --- examples/multiThread_replace_test.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/examples/multiThread_replace_test.cpp b/examples/multiThread_replace_test.cpp index d5c68e24..83ed2826 100644 --- a/examples/multiThread_replace_test.cpp +++ b/examples/multiThread_replace_test.cpp @@ -90,24 +90,19 @@ int main() { int iter = 0; while (iter < 200) { - std::cout << iter << std::endl; - hnswlib::HierarchicalNSW* alg_hnsw = new hnswlib::HierarchicalNSW(&space, max_elements, 16, 200, 123, true); // add batch1 data - std::cout << "Building index\n"; ParallelFor(0, max_elements, num_threads, [&](size_t row, size_t threadId) { alg_hnsw->addPoint((void*)(batch1 + d * row), row); }); // delete half random elements of batch1 data - std::cout << "Deleting\n"; for (int i = 0; i < num_elements; i++) { alg_hnsw->markDelete(rand_labels[i]); } // replace deleted elements with batch2 data - std::cout << "Updating elements\n"; ParallelFor(0, num_elements, num_threads, [&](size_t row, size_t threadId) { int label = rand_labels[row] + max_elements; alg_hnsw->addPoint((void*)(batch2 + d * row), label, true); From 1be2fea8711c7fb470128aeec27d007689362d84 Mon Sep 17 00:00:00 2001 From: Dmitry Yashunin Date: Thu, 12 Jan 2023 13:42:33 +0400 Subject: [PATCH 27/27] Update readme and refactoring --- README.md | 117 ++++++++++++++++-- hnswlib/hnswalg.h | 2 + python_bindings/tests/bindings_test_filter.py | 2 +- .../tests/bindings_test_stress_mt_replace.py | 2 +- 4 files changed, 113 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index c86e4391..c0b0dbcc 100644 --- a/README.md +++ b/README.md @@ -54,19 +54,22 @@ For other spaces use the nmslib library https://github.com/nmslib/nmslib. * `hnswlib.Index(space, dim)` creates a non-initialized index an HNSW in space `space` with integer dimension `dim`. `hnswlib.Index` methods: -* `init_index(max_elements, M = 16, ef_construction = 200, random_seed = 100)` initializes the index from with no elements. +* `init_index(max_elements, M = 16, ef_construction = 200, random_seed = 100, allow_replace_deleted = False)` initializes the index from with no elements. * `max_elements` defines the maximum number of elements that can be stored in the structure(can be increased/shrunk). * `ef_construction` defines a construction time/accuracy trade-off (see [ALGO_PARAMS.md](ALGO_PARAMS.md)). * `M` defines tha maximum number of outgoing connections in the graph ([ALGO_PARAMS.md](ALGO_PARAMS.md)). + * `allow_replace_deleted` enables replacing of deleted elements with new added ones. -* `add_items(data, ids, num_threads = -1)` - inserts the `data`(numpy array of vectors, shape:`N*dim`) into the structure. +* `add_items(data, ids, num_threads = -1, replace_deleted = False)` - inserts the `data`(numpy array of vectors, shape:`N*dim`) into the structure. * `num_threads` sets the number of cpu threads to use (-1 means use default). * `ids` are optional N-size numpy array of integer labels for all elements in `data`. - If index already has the elements with the same labels, their features will be updated. Note that update procedure is slower than insertion of a new element, but more memory- and query-efficient. + * `replace_deleted` replaces deleted elements. Note it allows to save memory. + - to use it `init_index` should be called with `allow_replace_deleted=True` * Thread-safe with other `add_items` calls, but not with `knn_query`. * `mark_deleted(label)` - marks the element as deleted, so it will be omitted from search results. Throws an exception if it is already deleted. -* + * `unmark_deleted(label)` - unmarks the element as deleted, so it will be not be omitted from search results. * `resize_index(new_size)` - changes the maximum capacity of the index. Not thread safe with `add_items` and `knn_query`. @@ -74,13 +77,15 @@ For other spaces use the nmslib library https://github.com/nmslib/nmslib. * `set_ef(ef)` - sets the query time accuracy/speed trade-off, defined by the `ef` parameter ( [ALGO_PARAMS.md](ALGO_PARAMS.md)). Note that the parameter is currently not saved along with the index, so you need to set it manually after loading. -* `knn_query(data, k = 1, num_threads = -1)` make a batch query for `k` closest elements for each element of the +* `knn_query(data, k = 1, num_threads = -1, filter = None)` make a batch query for `k` closest elements for each element of the * `data` (shape:`N*dim`). Returns a numpy array of (shape:`N*k`). * `num_threads` sets the number of cpu threads to use (-1 means use default). + * `filter` filters elements by its labels, returns elements with allowed ids * Thread-safe with other `knn_query` calls, but not with `add_items`. -* `load_index(path_to_index, max_elements = 0)` loads the index from persistence to the uninitialized index. +* `load_index(path_to_index, max_elements = 0, allow_replace_deleted = False)` loads the index from persistence to the uninitialized index. * `max_elements`(optional) resets the maximum number of elements in the structure. + * `allow_replace_deleted` specifies whether the index being loaded has enabled replacing of deleted elements. * `save_index(path_to_index)` saves the index from persistence. @@ -142,7 +147,7 @@ p.add_items(data, ids) # Controlling the recall by setting ef: p.set_ef(50) # ef should always be > k -# Query dataset, k - number of closest elements (returns 2 numpy arrays) +# Query dataset, k - number of the closest elements (returns 2 numpy arrays) labels, distances = p.knn_query(data, k = 1) # Index objects support pickling @@ -155,7 +160,6 @@ print(f"Parameters passed to constructor: space={p_copy.space}, dim={p_copy.dim print(f"Index construction: M={p_copy.M}, ef_construction={p_copy.ef_construction}") print(f"Index size is {p_copy.element_count} and index capacity is {p_copy.max_elements}") print(f"Search speed/quality trade-off parameter: ef={p_copy.ef}") - ``` An example with updates after serialization/deserialization: @@ -196,7 +200,6 @@ p.set_ef(10) # By default using all available cores p.set_num_threads(4) - print("Adding first batch of %d elements" % (len(data1))) p.add_items(data1) @@ -226,6 +229,104 @@ labels, distances = p.knn_query(data, k=1) print("Recall for two batches:", np.mean(labels.reshape(-1) == np.arange(len(data))), "\n") ``` +An example with a filter: +```python +import hnswlib +import numpy as np + +dim = 16 +num_elements = 10000 + +# Generating sample data +data = np.float32(np.random.random((num_elements, dim))) + +# Declaring index +hnsw_index = hnswlib.Index(space='l2', dim=dim) # possible options are l2, cosine or ip + +# Initiating index +# max_elements - the maximum number of elements, should be known beforehand +# (probably will be made optional in the future) +# +# ef_construction - controls index search speed/build speed tradeoff +# M - is tightly connected with internal dimensionality of the data +# strongly affects the memory consumption + +hnsw_index.init_index(max_elements=num_elements, ef_construction=100, M=16) + +# Controlling the recall by setting ef: +# higher ef leads to better accuracy, but slower search +hnsw_index.set_ef(10) + +# Set number of threads used during batch search/construction +# By default using all available cores +hnsw_index.set_num_threads(4) + +print("Adding %d elements" % (len(data))) +# Added elements will have consecutive ids +hnsw_index.add_items(data, ids=np.arange(num_elements)) + +print("Querying only even elements") +# Define filter function that allows only even ids +filter_function = lambda idx: idx%2 == 0 +# Query the elements for themselves and search only for even elements: +labels, distances = hnsw_index.knn_query(data, k=1, filter=filter_function) +# labels contain only elements with even id +``` + +An example with replacing of deleted elements: +```python +import hnswlib +import numpy as np + +dim = 16 +num_elements = 1_000 +max_num_elements = 2 * num_elements + +# Generating sample data +labels1 = np.arange(0, num_elements) +data1 = np.float32(np.random.random((num_elements, dim))) # batch 1 +labels2 = np.arange(num_elements, 2 * num_elements) +data2 = np.float32(np.random.random((num_elements, dim))) # batch 2 +labels3 = np.arange(2 * num_elements, 3 * num_elements) +data3 = np.float32(np.random.random((num_elements, dim))) # batch 3 + +# Declaring index +hnsw_index = hnswlib.Index(space='l2', dim=dim) + +# Initiating index +# max_elements - the maximum number of elements, should be known beforehand +# (probably will be made optional in the future) +# +# ef_construction - controls index search speed/build speed tradeoff +# M - is tightly connected with internal dimensionality of the data +# strongly affects the memory consumption + +# Enable replacing of deleted elements +hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, allow_replace_deleted=True) + +# Controlling the recall by setting ef: +# higher ef leads to better accuracy, but slower search +hnsw_index.set_ef(10) + +# Set number of threads used during batch search/construction +# By default using all available cores +hnsw_index.set_num_threads(4) + +# Add batch 1 and 2 data +hnsw_index.add_items(data1, labels1) +hnsw_index.add_items(data2, labels2) # Note: maximum number of elements is reached + +# Delete data of batch 2 +for label in labels2: + hnsw_index.mark_deleted(label) + +# Replace deleted elements +# Maximum number of elements is reached therefore we cannot add new items, +# but we can replace the deleted ones by using replace_deleted=True +hnsw_index.add_items(data3, labels3, replace_deleted=True) +# hnsw_index contains the data of batch 1 and batch 3 only +``` + ### Bindings installation You can install from sources: diff --git a/hnswlib/hnswalg.h b/hnswlib/hnswalg.h index d7487096..7f34e62b 100644 --- a/hnswlib/hnswalg.h +++ b/hnswlib/hnswalg.h @@ -451,6 +451,8 @@ class HierarchicalNSW : public AlgorithmInterface { tableint next_closest_entry_point = selectedNeighbors.back(); { + // lock only during the update + // because during the addition the lock for cur_c is already acquired std::unique_lock lock(link_list_locks_[cur_c], std::defer_lock); if (isUpdate) { lock.lock(); diff --git a/python_bindings/tests/bindings_test_filter.py b/python_bindings/tests/bindings_test_filter.py index a0715d7c..a798e02f 100644 --- a/python_bindings/tests/bindings_test_filter.py +++ b/python_bindings/tests/bindings_test_filter.py @@ -49,7 +49,7 @@ def testRandomSelf(self): filter_function = lambda id: id%2 == 0 labels, distances = hnsw_index.knn_query(data, k=1, filter=filter_function) self.assertAlmostEqual(np.mean(labels.reshape(-1) == np.arange(len(data))), .5, 3) - # Verify that there are onle even elements: + # Verify that there are only even elements: self.assertTrue(np.max(np.mod(labels, 2)) == 0) labels, distances = bf_index.knn_query(data, k=1, filter=filter_function) diff --git a/python_bindings/tests/bindings_test_stress_mt_replace.py b/python_bindings/tests/bindings_test_stress_mt_replace.py index 0c56c595..8cd3e9bc 100644 --- a/python_bindings/tests/bindings_test_stress_mt_replace.py +++ b/python_bindings/tests/bindings_test_stress_mt_replace.py @@ -28,8 +28,8 @@ def testRandomSelf(self): labels3 = np.arange(first_id, last_id) data3 = np.float32(np.random.random((num_elements, dim))) - # Declaring index for _ in range(100): + # Declaring index hnsw_index = hnswlib.Index(space='l2', dim=dim) hnsw_index.init_index(max_elements=max_num_elements, ef_construction=200, M=16, allow_replace_deleted=True)