Skip to content

Commit 351bf79

Browse files
Fix locking errors in streaming code (#214)
* report number of process_delete calls in consolidation_report * correct locking comments * convert _delete_set member in index class to a unique_ptr * Add shared delete_lock to process_delete; move adj list lock inside process_delete * insert acqires exclusive delete_lock before calling reserve_location(); change comments on locking requirements for calling reserve_location() * In search_for_points_and_add_links(), remove unnecessary checks of pool against delete_set. The function already checks the final results against _location_to_tag * Copy by reference when scratch spaces are returned * use reference to iterate over vector when modifying it's contents * minor changes to help compiler optimize * acquire shared delete lock to check delete set size * change locking in process_delete to avoid deadlock * remove _lazy_done member from Index class. It can be inferred from _delete_set.size() * Add locks to compact_data; remove _delete_set->clear since it's supposed to be empty here; use iterators to renumber edges * In process_delete, clear _final_graph[loc] before passing as output buffer to occlude_list * Use a set to collect unique nodes for candidate pool to pass to occlude_list * throw an exception if conslidate_delete or compact_data finds that the _start node has been deleted * add shared locks to print_status in Index class * fix to check on deleted start point * acquire tag and delete lock in enable delete * In process_delete, renamed iterator variable and conditioned the call to occlude_list on the candidate pool being larger than R * In process_delete, acquire lock on adj list before clearing it * Refactor protected member search_for_points_and_add_links so that tag_lock is moved to public functions * acquire _locks[loc] later in process_delete control flow * pre-allocating scratch space needed in process_delete
1 parent 032cab4 commit 351bf79

File tree

4 files changed

+307
-213
lines changed

4 files changed

+307
-213
lines changed

include/index.h

Lines changed: 62 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,31 @@ namespace diskann {
4545
};
4646
status_code _status;
4747
size_t _active_points, _max_points, _empty_slots, _slots_released,
48-
_delete_set_size;
48+
_delete_set_size, _num_calls_to_process_delete;
4949
double _time;
5050

5151
consolidation_report(status_code status, size_t active_points,
5252
size_t max_points, size_t empty_slots,
5353
size_t slots_released, size_t delete_set_size,
54-
double time_secs)
54+
size_t num_calls_to_process_delete, double time_secs)
5555
: _status(status), _active_points(active_points),
5656
_max_points(max_points), _empty_slots(empty_slots),
5757
_slots_released(slots_released), _delete_set_size(delete_set_size),
58+
_num_calls_to_process_delete(num_calls_to_process_delete),
5859
_time(time_secs) {
5960
}
6061
};
6162

6263
template<typename T, typename TagT = uint32_t>
6364
class Index {
65+
/**************************************************************************
66+
*
67+
* Public functions acquire one or more of _update_lock, _consolidate_lock,
68+
* _tag_lock, _delete_lock before calling protected functions which DO NOT
69+
* acquire these locks. They might acquire locks on _locks[i]
70+
*
71+
**************************************************************************/
72+
6473
public:
6574
// Constructor for Bulk operations and for creating the index object solely
6675
// for loading a prexisting index.
@@ -125,7 +134,7 @@ namespace diskann {
125134
// Set starting point to a random point on a sphere of certain radius
126135
DISKANN_DLLEXPORT void set_start_point_at_random(T radius);
127136

128-
// For Bulk Index FastL2 search, we interleave the data with graph
137+
// For FastL2 search on a static index, we interleave the data with graph
129138
DISKANN_DLLEXPORT void optimize_index_layout();
130139

131140
// For FastL2 search on optimized layout
@@ -183,9 +192,9 @@ namespace diskann {
183192
// memory should be allocated for vec before calling this function
184193
DISKANN_DLLEXPORT int get_vector_by_tag(TagT &tag, T *vec);
185194

186-
DISKANN_DLLEXPORT void print_status() const;
195+
DISKANN_DLLEXPORT void print_status();
187196

188-
DISKANN_DLLEXPORT void count_nodes_at_bfs_levels() const;
197+
DISKANN_DLLEXPORT void count_nodes_at_bfs_levels();
189198

190199
// This variable MUST be updated if the number of entries in the metadata
191200
// change.
@@ -203,23 +212,25 @@ namespace diskann {
203212
Index<T, TagT> &operator=(const Index<T, TagT> &) = delete;
204213

205214
// Use after _data and _nd have been populated
215+
// Acquire exclusive _update_lock before calling
206216
void build_with_data_populated(Parameters &parameters,
207217
const std::vector<TagT> &tags);
208218

209219
// generates 1 frozen point that will never be deleted from the graph
210220
// This is not visible to the user
211221
int generate_frozen_point();
212222

213-
// determines navigating node of the graph by calculating medoid of data
223+
// determines navigating node of the graph by calculating medoid of datafopt
214224
unsigned calculate_entry_point();
215225

216226
std::pair<uint32_t, uint32_t> iterate_to_fixed_point(
217227
const T *node_coords, const unsigned Lindex,
218228
const std::vector<unsigned> &init_ids, InMemQueryScratch<T> *scratch,
219229
bool ret_frozen = true, bool search_invocation = false);
220230

221-
void search_for_point_and_add_links(int location, _u32 Lindex,
222-
InMemQueryScratch<T> *scratch);
231+
void search_for_point_and_prune(int location, _u32 Lindex,
232+
std::vector<unsigned> &pruned_list,
233+
InMemQueryScratch<T> *scratch);
223234

224235
void prune_neighbors(const unsigned location, std::vector<Neighbor> &pool,
225236
std::vector<unsigned> &pruned_list,
@@ -230,6 +241,8 @@ namespace diskann {
230241
const float alpha, std::vector<unsigned> &pruned_list,
231242
InMemQueryScratch<T> *scratch);
232243

244+
// Prunes candidates in @pool to a shorter list @result
245+
// @pool must be sorted before calling
233246
void occlude_list(
234247
const unsigned location, std::vector<Neighbor> &pool, const float alpha,
235248
const unsigned degree, const unsigned maxc,
@@ -243,30 +256,34 @@ namespace diskann {
243256
void inter_insert(unsigned n, std::vector<unsigned> &pruned_list,
244257
InMemQueryScratch<T> *scratch);
245258

259+
// Acquire exclusive _update_lock before calling
246260
void link(Parameters &parameters);
247261

248-
// Acquire _tag_lock before calling
249-
int reserve_location();
262+
// Acquire exclusive _tag_lock and _delete_lock before calling
263+
int reserve_location();
264+
265+
// Acquire exclusive _tag_lock before calling
250266
size_t release_location(int location);
251-
size_t release_locations(tsl::robin_set<unsigned> &locations);
267+
size_t release_locations(const tsl::robin_set<unsigned> &locations);
252268

253269
// Resize the index when no slots are left for insertion.
254-
// MUST acquire _num_points_lock and _update_lock before calling.
270+
// Acquire exclusive _update_lock and _tag_lock before calling.
255271
void resize(size_t new_max_points);
256272

257-
// Take an unique lock on _update_lock and _consolidate_lock
258-
// before calling these functions.
273+
// Acquire unique lock on _update_lock, _consolidate_lock, _tag_lock
274+
// and _delete_lock before calling these functions.
259275
// Renumber nodes, update tag and location maps and compact the
260276
// graph, mode = _consolidated_order in case of lazy deletion and
261277
// _compacted_order in case of eager deletion
262278
DISKANN_DLLEXPORT void compact_data();
263279
DISKANN_DLLEXPORT void compact_frozen_point();
264280

265-
// Remove deleted nodes from adj list of node i and absorb edges from
266-
// deleted neighbors Acquire _locks[i] prior to calling for thread-safety
281+
// Remove deleted nodes from adjacency list of node loc
282+
// Replace removed neighbors with second order neighbors.
283+
// Also acquires _locks[i] for i = loc and out-neighbors of loc.
267284
void process_delete(const tsl::robin_set<unsigned> &old_delete_set,
268-
size_t i, const unsigned &range, const unsigned &maxc,
269-
const float &alpha, InMemQueryScratch<T> *scratch);
285+
size_t loc, const unsigned range, const unsigned maxc,
286+
const float alpha, InMemQueryScratch<T> *scratch);
270287

271288
void initialize_query_scratch(uint32_t num_threads, uint32_t search_l,
272289
uint32_t indexing_l, uint32_t r,
@@ -299,7 +316,7 @@ namespace diskann {
299316

300317
// Data
301318
T *_data = nullptr;
302-
char *_opt_graph;
319+
char *_opt_graph = nullptr;
303320

304321
// Graph related data structures
305322
std::vector<std::vector<unsigned>> _final_graph;
@@ -335,13 +352,6 @@ namespace diskann {
335352
// Query scratch data structures
336353
ConcurrentQueue<InMemQueryScratch<T> *> _query_scratch;
337354

338-
// data structures, flags and locks for dynamic indexing
339-
tsl::sparse_map<TagT, unsigned> _tag_to_location;
340-
natural_number_map<unsigned, TagT> _location_to_tag;
341-
342-
tsl::robin_set<unsigned> _delete_set;
343-
natural_number_set<unsigned> _empty_slots;
344-
345355
// Flags for PQ based distance calculation
346356
bool _pq_dist = false;
347357
bool _use_opq = false;
@@ -350,23 +360,38 @@ namespace diskann {
350360
bool _pq_generated = false;
351361
FixedChunkPQTable _pq_table;
352362

353-
bool _lazy_done = false; // true if lazy deletions have been made
363+
//
364+
// Data structures, locks and flags for dynamic indexing and tags
365+
//
366+
367+
// lazy_delete removes entry from _location_to_tag and _tag_to_location. If
368+
// _location_to_tag does not resolve a location, infer that it was deleted.
369+
tsl::sparse_map<TagT, unsigned> _tag_to_location;
370+
natural_number_map<unsigned, TagT> _location_to_tag;
371+
372+
// _empty_slots has unallocated slots and those freed by consolidate_delete.
373+
// _delete_set has locations marked deleted by lazy_delete. Will not be
374+
// immediately available for insert. consolidate_delete will release these
375+
// slots to _empty_slots.
376+
natural_number_set<unsigned> _empty_slots;
377+
std::unique_ptr<tsl::robin_set<unsigned>> _delete_set;
378+
354379
bool _data_compacted = true; // true if data has been compacted
355380
bool _is_saved = false; // Gopal. Checking if the index is already saved.
356381
bool _conc_consolidate = false; // use _lock while searching
357382

358-
// Per node lock, cardinality=max_points_
359-
std::vector<non_recursive_mutex> _locks;
360-
361-
// If acquiring multiple locks below, acquire locks in the order below
383+
// Acquire locks in the order below when acquiring multiple locks
362384
std::shared_timed_mutex // RW mutex between save/load (exclusive lock) and
363385
_update_lock; // search/inserts/deletes/consolidate (shared lock)
364-
std::shared_timed_mutex
365-
_consolidate_lock; // Ensure only one consolidate is ever active
366-
std::shared_timed_mutex
367-
_tag_lock; // RW lock for _tag_to_location and _location_to_tag
368-
std::shared_timed_mutex
369-
_delete_lock; // RW Lock on _delete_set and _empty_slots
386+
std::shared_timed_mutex // Ensure only one consolidate or compact_data is
387+
_consolidate_lock; // ever active
388+
std::shared_timed_mutex // RW lock for _tag_to_location,
389+
_tag_lock; // _location_to_tag, _empty_slots, _nd, _max_points
390+
std::shared_timed_mutex // RW Lock on _delete_set and _data_compacted
391+
_delete_lock; // variable
392+
393+
// Per node lock, cardinality=_max_points
394+
std::vector<non_recursive_mutex> _locks;
370395

371396
static const float INDEX_GROWTH_FACTOR;
372397
};

include/scratch.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,12 @@ namespace diskann {
4848
inline uint32_t get_maxc() {
4949
return _maxc;
5050
}
51-
5251
inline T *aligned_query() {
5352
return _aligned_query;
5453
}
5554
inline PQScratch<T> *pq_scratch() {
5655
return _pq_scratch;
5756
}
58-
5957
inline std::vector<Neighbor> &pool() {
6058
return _pool;
6159
}
@@ -65,7 +63,6 @@ namespace diskann {
6563
inline std::vector<float> &occlude_factor() {
6664
return _occlude_factor;
6765
}
68-
6966
inline tsl::robin_set<unsigned> &inserted_into_pool_rs() {
7067
return _inserted_into_pool_rs;
7168
}
@@ -78,6 +75,15 @@ namespace diskann {
7875
inline std::vector<float> &dist_scratch() {
7976
return _dist_scratch;
8077
}
78+
inline tsl::robin_set<unsigned> &expanded_nodes_set() {
79+
return _expanded_nodes_set;
80+
}
81+
inline std::vector<Neighbor> &expanded_nodes_vec() {
82+
return _expanded_nghrs_vec;
83+
}
84+
inline std::vector<unsigned> &occlude_list_output() {
85+
return _occlude_list_output;
86+
}
8187

8288
private:
8389
uint32_t _L;
@@ -116,6 +122,11 @@ namespace diskann {
116122
// _dist_scratch must be > R*GRAPH_SLACK_FACTOR for iterate_to_fp
117123
// _dist_scratch should be at least the size of id_scratch
118124
std::vector<float> _dist_scratch;
125+
126+
// Buffers used in process delete, capacity increases as needed
127+
tsl::robin_set<unsigned> _expanded_nodes_set;
128+
std::vector<Neighbor> _expanded_nghrs_vec;
129+
std::vector<unsigned> _occlude_list_output;
119130
};
120131

121132
//

0 commit comments

Comments
 (0)