Skip to content

Commit 8674ce9

Browse files
authoredOct 18, 2023
Harshasi/final runbook (harsha-simhadri#177)
* new classes, methods and config * add runbook * edit runbook_gen * add final runbook * remove data file in GT after calculation is done * h5py libver latest * set h5py dataset directly with 2D np array * set k=10 and run_count to 1 * add final runbook to data_export * reduce streaming runtime to 1hr * try different configs * add 8GB memory limit for streaming * add README instructions * make download_gt work * update streaming runtime
1 parent f5bb90b commit 8674ce9

11 files changed

+4099
-22
lines changed
 

‎benchmark/datasets.py

+30
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ def get_dataset_iterator(self, bs=512, split=(1,0)):
193193
j1 = min(j0 + bs, i1)
194194
yield sanitize(x[j0:j1])
195195

196+
def get_data_in_range(self, start, end):
197+
assert start >= 0
198+
assert end <= self.nb
199+
filename = self.get_dataset_fn()
200+
x = xbin_mmap(filename, dtype=self.dtype, maxn=self.nb)
201+
return x[start:end]
202+
196203
def search_type(self):
197204
return "knn"
198205

@@ -434,6 +441,28 @@ def distance(self):
434441

435442
def prepare(self, skip_data=False, original_size=10 ** 9):
436443
return super().prepare(skip_data, original_size = self.nb)
444+
445+
class MSTuringClustered30M(DatasetCompetitionFormat):
446+
def __init__(self):
447+
self.nb = 29998994
448+
self.d = 100
449+
self.nq = 10000
450+
self.dtype = "float32"
451+
self.ds_fn = "30M-clustered64.fbin"
452+
self.qs_fn = "testQuery10K.fbin"
453+
self.gt_fn = "clu_msturing30M_gt100"
454+
455+
self.base_url = "https://comp21storage.blob.core.windows.net/publiccontainer/comp23/clustered_data/msturing-30M-clustered/"
456+
self.basedir = os.path.join(BASEDIR, "MSTuring-30M-clustered")
457+
458+
self.private_gt_url = None
459+
self.private_qs_url = None
460+
461+
def distance(self):
462+
return "euclidean"
463+
464+
def prepare(self, skip_data=False, original_size=10 ** 9):
465+
return super().prepare(skip_data, original_size = self.nb)
437466

438467
class MSSPACEV1B(DatasetCompetitionFormat):
439468
def __init__(self, nb_M=1000):
@@ -984,6 +1013,7 @@ def __str__(self):
9841013
'msturing-1M': lambda : MSTuringANNS(1),
9851014

9861015
'msturing-10M-clustered': lambda: MSTuringClustered10M(),
1016+
'msturing-30M-clustered': lambda: MSTuringClustered30M(),
9871017

9881018
'msspacev-1B': lambda : MSSPACEV1B(1000),
9891019
'msspacev-100M': lambda : MSSPACEV1B(100),

‎benchmark/results.py

+3-6
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import re
77
import traceback
88

9-
109
def get_result_filename(dataset=None, count=None, definition=None,
1110
query_arguments=None, neurips23track=None, runbook_path=None):
1211
d = ['results']
@@ -41,9 +40,7 @@ def get_result_filename(dataset=None, count=None, definition=None,
4140

4241
def add_results_to_h5py(f, search_type, results, count, suffix = ''):
4342
if search_type == "knn" or search_type == "knn_filtered":
44-
neighbors = f.create_dataset('neighbors' + suffix, (len(results), count), 'i')
45-
for i, idxs in enumerate(results):
46-
neighbors[i] = idxs
43+
neighbors = f.create_dataset('neighbors' + suffix, (len(results), count), 'i', data = results)
4744
elif search_type == "range":
4845
lims, D, I= results
4946
f.create_dataset('neighbors' + suffix, data=I)
@@ -59,7 +56,7 @@ def store_results(dataset, count, definition, query_arguments,
5956
head, tail = os.path.split(fn)
6057
if not os.path.isdir(head):
6158
os.makedirs(head)
62-
f = h5py.File(fn, 'w')
59+
f = h5py.File(name=fn, mode='w', libver='latest')
6360
for k, v in attrs.items():
6461
f.attrs[k] = v
6562

@@ -83,7 +80,7 @@ def load_all_results(dataset=None, count=None, neurips23track=None, runbook_path
8380
if os.path.splitext(fn)[-1] != '.hdf5':
8481
continue
8582
try:
86-
f = h5py.File(os.path.join(root, fn), 'r+')
83+
f = h5py.File(name=os.path.join(root, fn), mode='r+', libver='latest')
8784
properties = dict(f.attrs)
8885
yield properties, f
8986
f.close()

‎benchmark/runner.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ def run(definition, dataset, count, run_count, rebuild,
100100
algo.set_query_arguments(*query_arguments)
101101
if neurips23track == 'streaming':
102102
descriptor, results = custom_runner.run_task(
103-
algo, ds, distance, 1, run_count, search_type, private_query, runbook)
103+
algo, ds, distance, count, 1, search_type, private_query, runbook)
104104
else:
105105
descriptor, results = custom_runner.run_task(
106106
algo, ds, distance, count, run_count, search_type, private_query)
@@ -116,9 +116,11 @@ def run(definition, dataset, count, run_count, rebuild,
116116
X = ds.get_private_queries()
117117
power_stats = power_capture.run(algo, X, distance, count,
118118
run_count, search_type, descriptor)
119+
print('start store results')
119120
store_results(dataset, count, definition,
120121
query_arguments, descriptor,
121122
results, search_type, neurips23track, runbook_path)
123+
print('end store results')
122124
finally:
123125
algo.done()
124126

@@ -263,7 +265,7 @@ def run_docker(definition, dataset, count, runs, timeout, rebuild,
263265

264266
client = docker.from_env()
265267
if mem_limit is None:
266-
mem_limit = psutil.virtual_memory().available
268+
mem_limit = psutil.virtual_memory().available if neurips23track != 'streaming' else (8*1024*1024*1024)
267269

268270
# ready the container object invoked later in this function
269271
container = None

‎benchmark/streaming/compute_gt.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def gt_dir(ds, runbook_path):
2626
return os.path.join(ds.basedir, str(ds.nb), runbook_filename)
2727

2828
def output_gt(ds, ids, step, gt_cmdline, runbook_path):
29-
data = ds.get_dataset()
29+
data = ds.get_data_in_range(0, ds.nb)
3030
data_slice = data[ids]
3131

3232
dir = gt_dir(ds, runbook_path)
@@ -52,6 +52,9 @@ def output_gt(ds, ids, step, gt_cmdline, runbook_path):
5252
gt_cmdline += ' --tags_file ' + tags_file
5353
print("Executing cmdline: ", gt_cmdline)
5454
os.system(gt_cmdline)
55+
print("Removing data file")
56+
rm_cmdline = "rm " + data_file
57+
os.system(rm_cmdline)
5558

5659

5760
def main():

‎data_export.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,10 @@ def cleaned_run_metric(run_metrics):
9595
print(f"Looking at track:{track}, dataset:{dataset_name}")
9696
dataset = DATASETS[dataset_name]()
9797
if track == 'streaming':
98-
for runbook_path in ['neurips23/streaming/simple_runbook.yaml', 'neurips23/streaming/clustered_runbook.yaml', 'neurips23/streaming/delete_runbook.yaml']:
98+
for runbook_path in ['neurips23/streaming/simple_runbook.yaml',
99+
'neurips23/streaming/clustered_runbook.yaml',
100+
'neurips23/streaming/delete_runbook.yaml',
101+
'neurips23/streaming/final_runbook.yaml']:
99102
results = load_all_results(dataset_name, neurips23track=track, runbook_path=runbook_path)
100103
run_metrics = compute_metrics_all_runs(dataset, dataset_name, results, args.recompute, \
101104
args.sensors, args.search_times, args.private_query, \

‎neurips23/README.md

+8-3
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ The Practical Vector Search challenge at NeurIPS 2023 has four different tasks:
2323
The tags are from a vocabulary of 200386 possible tags.
2424
The 100,000 queries consist of one image embedding and one or two tags that must appear in the database elements to be considered.
2525

26-
**Task Streaming:** This task uses 10M slice of the MS Turing data set released in the previous challenge. The index starts with zero points and must implement the "runbook" provided - a sequence of insertion operations, deletion operations, and search commands (roughly 4:4:1 ratio) - within a time bound of 1 hour. In the final run, we will use a different runbook, and possibly a different data set, to avoid participants over-fitting to this dataset. Entries will be ranked by average recall over queries at all check points. The intention is for the algorithm to process the operations and maintain a compact index over the active points rather than index the entire anticipated set of points and use tombstones or flags to mark active elements.
26+
**Task Streaming:** This task uses 10M slice of the MS Turing data set released in the previous challenge. The index starts with zero points and must implement the "runbook" provided - a sequence of insertion operations, deletion operations, and search commands (roughly 4:4:1 ratio) - within a time bound of 1 hour and a DRAM limit of 8GB. Entries will be ranked by average recall over queries at all check points. The intention is for the algorithm to process the operations and maintain a compact index over the active points rather than index the entire anticipated set of points and use tombstones or flags to mark active elements. ~~In the final run, we will use a different runbook, and possibly a different data set, to avoid participants over-fitting to this dataset.~~ The final run will use `msturing-30M-clustered`, a 30M slice of the MSTuring dataset, and the `final_runbook.yaml` runbook.
2727

2828
**Task Out-Of-Distribution:** Yandex Text-to-Image 10M represents a cross-modal dataset where the database and query vectors have different distributions in the shared vector space.
2929
The base set is a 10M subset of the Yandex visual search database of 200-dimensional image embeddings which are produced with the Se-ResNext-101 model.
@@ -46,6 +46,7 @@ The baselines were run on an Azure Standard D8lds v5 (8 vcpus, 16 GiB memory) ma
4646
|Sparse | Linear Scan | 101 | `python3 run.py --dataset sparse-full --algorithm linscan --neurips23track sparse` |
4747
|Filter | faiss | 3200 | `python3 run.py --dataset yfcc-10M --algorithm faiss --neurips23track filter` |
4848
|Streaming| DiskANN | 0.924 (recall@10), 23 mins | `python3 run.py --dataset msturing-10M-clustered --algorithm diskann --neurips23track streaming --runbook_path neurips23/streaming/delete_runbook.yaml` |
49+
|Streaming| DiskANN | 0.883 (recall@10), 45 mins | `python3 run.py --dataset msturing-30M-clustered --algorithm diskann --neurips23track streaming --runbook_path neurips23/streaming/final_runbook.yaml` |
4950
|OOD | DiskANN | 4882 | `python3 run.py --dataset text2image-10M --algorithm diskann --neurips23track ood` |
5051

5152

@@ -110,13 +111,17 @@ For the competition dataset, run commands mentioned in the table above, for exam
110111
python run.py --neurips23track filter --algorithm faiss --dataset yfcc-10M
111112
python run.py --neurips23track sparse --algorithm linscan --dataset sparse-full
112113
python run.py --neurips23track ood --algorithm diskann --dataset text2image-10M
114+
# preliminary runbook for testing
113115
python run.py --neurips23track streaming --algorithm diskann --dataset msturing-10M-clustered --runbook_path neurips23/streaming/delete_runbook.yaml
116+
#Final runbook for evaluation
117+
python run.py --neurips23track streaming --algorithm diskann --dataset msturing-30M-clustered --runbook_path neurips23/streaming/final_runbook.yaml
114118
```
115119

116120
For streaming track, runbook specifies the order of operations to be executed by the algorithms. To download the ground truth for every search operation: (needs azcopy tool in your binary path):
117121
```
118-
python benchmark/streaming/download_gt.py --runbook_file neurips23/streaming/simple_runbook.yaml --dataset msspacev-10M
119-
python benchmark/streaming/download_gt.py --runbook_file neurips23/streaming/delete_runbook.yaml --dataset msturing-10M-clustered
122+
python -m benchmark.streaming.download_gt --runbook_file neurips23/streaming/simple_runbook.yaml --dataset msspacev-10M
123+
python -m benchmark.streaming.download_gt --runbook_file neurips23/streaming/delete_runbook.yaml --dataset msturing-10M-clustered
124+
python -m benchmark.streaming.download_gt --runbook_file neurips23/streaming/final_runbook.yaml --dataset msturing-30M-clustered
120125
```
121126
Alternately, to compute ground truth for an arbitrary runbook, [clone and build DiskANN repo](https://github.com/Microsoft/DiskANN) and use the command line tool to compute ground truth at various search checkpoints. The `--gt_cmdline_tool` points to the directory with DiskANN commandline tools.
122127
```

‎neurips23/streaming/diskann/config.yaml

+15-1
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,18 @@ msturing-10M-clustered:
8383
args: |
8484
[{"R":64, "L":50, "insert_threads":16, "consolidate_threads":16}]
8585
query-args: |
86-
[{"Ls":100, "T":16}]
86+
[{"Ls":100, "T":16}]
87+
msturing-30M-clustered:
88+
diskann:
89+
docker-tag: neurips23-streaming-diskann
90+
module: neurips23.streaming.diskann.diskann-str
91+
constructor: diskann
92+
base-args: ["@metric"]
93+
run-groups:
94+
base:
95+
args: |
96+
[{"R":32, "L":50, "insert_threads":16, "consolidate_threads":16},
97+
{"R":32, "L":70, "insert_threads":16, "consolidate_threads":16},
98+
{"R":50, "L":50, "insert_threads":16, "consolidate_threads":16}]
99+
query-args: |
100+
[{"Ls":70, "T":16}]

‎neurips23/streaming/final_runbook.yaml

+3,843
Large diffs are not rendered by default.
+177
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import argparse
2+
import os
3+
import numpy as np
4+
import random
5+
import yaml
6+
7+
from scipy.cluster.vq import vq, kmeans2
8+
from typing import Tuple
9+
from benchmark.datasets import DATASETS
10+
11+
def cluster_and_permute(
12+
data, num_clusters
13+
) -> Tuple[np.ndarray[int], np.ndarray[int]]:
14+
"""
15+
Cluster the data and return permutation of row indices
16+
that would group indices of the same cluster together
17+
"""
18+
npts = np.shape(data)[0]
19+
sample_size = min(100000, npts)
20+
sample_indices = np.random.choice(range(npts), size=sample_size, replace=False)
21+
sampled_data = data[sample_indices, :]
22+
centroids, sample_labels = kmeans2(sampled_data, num_clusters, minit="++", iter=10)
23+
labels, dist = vq(data, centroids)
24+
25+
count = np.zeros(num_clusters)
26+
for i in range(npts):
27+
count[labels[i]] += 1
28+
print("Cluster counts")
29+
print(count)
30+
31+
offsets = np.zeros(num_clusters + 1, dtype=int)
32+
for i in range(0, num_clusters, 1):
33+
offsets[i + 1] = offsets[i] + count[i]
34+
35+
permutation = np.zeros(npts, dtype=int)
36+
counters = np.zeros(num_clusters, dtype=int)
37+
for i in range(npts):
38+
label = labels[i]
39+
row = offsets[label] + counters[label]
40+
counters[label] += 1
41+
permutation[row] = i
42+
43+
return offsets, permutation
44+
45+
46+
def write_permuated_data(
47+
data,
48+
permutation:np.ndarray[int],
49+
output_data_file:str
50+
):
51+
permuted_data = data[permutation,:]
52+
53+
shape = np.shape(permuted_data)
54+
with open(output_data_file, 'wb') as df:
55+
df.write(shape[0].to_bytes(4, 'little'))
56+
df.write(shape[1].to_bytes(4, 'little'))
57+
df.write(permuted_data)
58+
59+
60+
def create_runbook(
61+
dataset_str:str,
62+
offsets:np.ndarray[int],
63+
permutation:np.ndarray[int],
64+
num_clusters:int,
65+
output_yaml_file:str
66+
):
67+
ins_cursor_start = offsets.copy()
68+
ins_cursor_end = offsets.copy()
69+
70+
del_cursor_start = offsets.copy()
71+
del_cursor_end = offsets.copy()
72+
73+
operation_list = []
74+
num_operations = 1
75+
active_points = 0
76+
max_pts = 0
77+
active_points_in_cluster = np.zeros(num_clusters)
78+
79+
num_rounds = 5
80+
sample = np.random.default_rng().dirichlet((100,15,10,5,3), num_clusters)
81+
for c in range(num_clusters):
82+
np.random.default_rng().shuffle(sample[c])
83+
print(sample)
84+
85+
for round in range(num_rounds):
86+
#insertions
87+
for c in range(num_clusters):
88+
delta = (int)((offsets[c+1]-offsets[c]) * sample[c,round])
89+
ins_cursor_end[c] = ins_cursor_start[c] + delta
90+
active_points += delta
91+
max_pts = max(max_pts, active_points)
92+
active_points_in_cluster[c] += delta
93+
print('ins [', ins_cursor_start[c], ', ', ins_cursor_end[c],
94+
') active:', int(active_points_in_cluster[c]),
95+
'total:', active_points)
96+
entry = [{'operation': 'insert'}, {'start': int(ins_cursor_start[c])}, {'end': int(ins_cursor_end[c])}]
97+
operation_list.append((num_operations, entry))
98+
num_operations += 1
99+
operation_list.append((num_operations, [{'operation': str('search')}]))
100+
num_operations += 1
101+
ins_cursor_start[c] = ins_cursor_end[c]
102+
103+
#deletions
104+
for c in range(num_clusters):
105+
fraction = random.uniform(0.5,0.9)
106+
delta = (int)(fraction*(ins_cursor_end[c]-del_cursor_start[c]))
107+
del_cursor_end[c] = del_cursor_start[c] + delta
108+
active_points -= delta
109+
active_points_in_cluster[c] -= delta
110+
print('del [', del_cursor_start[c], ',', del_cursor_end[c],
111+
') active:', int(active_points_in_cluster[c]),
112+
'total:', active_points)
113+
entry = [{'operation': 'delete'}, {'start': int(del_cursor_start[c])}, {'end': int(del_cursor_end[c])}]
114+
operation_list.append((num_operations, entry))
115+
num_operations += 1
116+
operation_list.append((num_operations, [{'operation': 'search'}]))
117+
num_operations += 1
118+
del_cursor_start[c] = del_cursor_end[c]
119+
120+
121+
with open(output_yaml_file, 'w') as yf:
122+
operation_list.sort(key = lambda x: x[0])
123+
sorted_dict = {}
124+
sorted_dict['max_pts'] = int(max_pts)
125+
for (k, v) in operation_list:
126+
sorted_dict[k]=v
127+
yaml_object = {}
128+
yaml_object[dataset_str] = sorted_dict
129+
yaml.dump(yaml_object, yf)
130+
131+
132+
def main():
133+
parser = argparse.ArgumentParser(
134+
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
135+
136+
parser.add_argument(
137+
'--dataset',
138+
choices=DATASETS.keys(),
139+
required=True)
140+
parser.add_argument(
141+
'-c', '--num_clusters',
142+
type=int,
143+
required=True
144+
)
145+
parser.add_argument(
146+
'-o', '--output_data_file',
147+
required=True
148+
)
149+
parser.add_argument(
150+
'-y', '--output_yaml_file',
151+
required=True
152+
)
153+
args = parser.parse_args()
154+
155+
ds = DATASETS[args.dataset]()
156+
if ds.nb <= 10**7:
157+
data = ds.get_dataset()
158+
else:
159+
data = next(ds.get_dataset_iterator(bs=ds.nb))
160+
print(np.shape(data))
161+
162+
offsets, permutation = cluster_and_permute(data, args.num_clusters)
163+
print(permutation)
164+
165+
write_permuated_data(data=data,
166+
permutation=permutation,
167+
output_data_file=args.output_data_file)
168+
169+
create_runbook(dataset_str=args.dataset,
170+
offsets=offsets,
171+
permutation=permutation,
172+
num_clusters=args.num_clusters,
173+
output_yaml_file=args.output_yaml_file)
174+
175+
176+
if __name__ == '__main__':
177+
main()

‎neurips23/streaming/run.py

+10-7
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,16 @@ def build(algo, dataset, max_pts):
1717
algo.setup(ds.dtype, max_pts, ndims)
1818
print('Algorithm set up')
1919
return time.time() - t0
20+
2021

2122

2223
def run_task(algo, ds, distance, count, run_count, search_type, private_query, runbook):
2324
best_search_time = float('inf')
2425
search_times = []
2526
all_results = []
2627

27-
data = ds.get_dataset()
28-
ids = np.arange(1, ds.nb+1, dtype=np.uint32)
28+
# data = ds.get_dataset()
29+
# ids = np.arange(1, ds.nb+1, dtype=np.uint32)
2930

3031
Q = ds.get_queries() if not private_query else ds.get_private_queries()
3132
print(fr"Got {Q.shape[0]} queries")
@@ -34,11 +35,13 @@ def run_task(algo, ds, distance, count, run_count, search_type, private_query, r
3435
result_map = {}
3536
num_searches = 0
3637
for step, entry in enumerate(runbook):
37-
start = time.time()
38+
start_time = time.time()
3839
match entry['operation']:
3940
case 'insert':
40-
ids = np.arange(entry['start'], entry['end'], dtype=np.uint32)
41-
algo.insert(data[ids,:], ids)
41+
start = entry['start']
42+
end = entry['end']
43+
ids = np.arange(start, end, dtype=np.uint32)
44+
algo.insert(ds.get_data_in_range(start, end), ids)
4245
case 'delete':
4346
ids = np.arange(entry['start'], entry['end'], dtype=np.uint32)
4447
algo.delete(ids)
@@ -56,7 +59,7 @@ def run_task(algo, ds, distance, count, run_count, search_type, private_query, r
5659
num_searches += 1
5760
case _:
5861
raise NotImplementedError('Invalid runbook operation.')
59-
step_time = (time.time() - start)
62+
step_time = (time.time() - start_time)
6063
print(f"Step {step+1} took {step_time}s.")
6164

6265
attrs = {
@@ -71,7 +74,7 @@ def run_task(algo, ds, distance, count, run_count, search_type, private_query, r
7174

7275
for k, v in result_map.items():
7376
attrs['step_' + str(k)] = v
74-
77+
7578
additional = algo.get_additional()
7679
for k in additional:
7780
attrs[k] = additional[k]

‎requirements_py3.10.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
ansicolors==1.1.8
22
docker==6.1.2
3-
h5py==3.8.0
3+
h5py==3.10.0
44
matplotlib==3.3.4
55
numpy==1.24.2
66
pyyaml==6.0

0 commit comments

Comments
 (0)
Please sign in to comment.