|
| 1 | +import argparse |
| 2 | +import json |
| 3 | +import logging |
| 4 | +import os |
| 5 | +import threading |
| 6 | +import time |
| 7 | +import traceback |
| 8 | + |
| 9 | +import colors |
| 10 | +import docker |
| 11 | +import numpy |
| 12 | +import psutil |
| 13 | + |
| 14 | +from benchmark.algorithms.definitions import (Definition, |
| 15 | + instantiate_algorithm) |
| 16 | +from benchmark.datasets import DATASETS |
| 17 | +from benchmark.results import store_results |
| 18 | + |
| 19 | + |
| 20 | +def run_individual_query(algo, X, distance, count, run_count, search_type): |
| 21 | + best_search_time = float('inf') |
| 22 | + for i in range(run_count): |
| 23 | + print('Run %d/%d...' % (i + 1, run_count)) |
| 24 | + |
| 25 | + start = time.time() |
| 26 | + if search_type == "knn": |
| 27 | + algo.query(X, count) |
| 28 | + else: |
| 29 | + algo.range_query(X, count) |
| 30 | + total = (time.time() - start) |
| 31 | + |
| 32 | + results = algo.get_results() |
| 33 | + |
| 34 | + search_time = total |
| 35 | + best_search_time = min(best_search_time, search_time) |
| 36 | + |
| 37 | + attrs = { |
| 38 | + "best_search_time": best_search_time, |
| 39 | + "name": str(algo), |
| 40 | + "run_count": run_count, |
| 41 | + "distance": distance, |
| 42 | + "type": search_type, |
| 43 | + "count": int(count) |
| 44 | + } |
| 45 | + additional = algo.get_additional() |
| 46 | + for k in additional: |
| 47 | + attrs[k] = additional[k] |
| 48 | + return (attrs, results) |
| 49 | + |
| 50 | + |
| 51 | +def run(definition, dataset, count, run_count): |
| 52 | + algo = instantiate_algorithm(definition) |
| 53 | + assert not definition.query_argument_groups \ |
| 54 | + or hasattr(algo, "set_query_arguments"), """\ |
| 55 | +error: query argument groups have been specified for %s.%s(%s), but the \ |
| 56 | +algorithm instantiated from it does not implement the set_query_arguments \ |
| 57 | +function""" % (definition.module, definition.constructor, definition.arguments) |
| 58 | + |
| 59 | + ds = DATASETS[dataset] |
| 60 | + #X_train = numpy.array(D['train']) |
| 61 | + X = ds.get_queries() |
| 62 | + distance = ds.distance() |
| 63 | + search_type = ds.search_type() |
| 64 | + print(f"Running {definition.algorithm} on {dataset}") |
| 65 | + print(fr"Got {len(X)} queries") |
| 66 | + |
| 67 | + try: |
| 68 | + # Try loading the index from the file |
| 69 | + memory_usage_before = algo.get_memory_usage() |
| 70 | + if not algo.load_index(dataset): |
| 71 | + # Build the index if it is not available |
| 72 | + t0 = time.time() |
| 73 | + algo.fit(dataset) |
| 74 | + build_time = time.time() - t0 |
| 75 | + print('Built index in', build_time) |
| 76 | + |
| 77 | + index_size = algo.get_memory_usage() - memory_usage_before |
| 78 | + print('Index size: ', index_size) |
| 79 | + |
| 80 | + query_argument_groups = definition.query_argument_groups |
| 81 | + # Make sure that algorithms with no query argument groups still get run |
| 82 | + # once by providing them with a single, empty, harmless group |
| 83 | + if not query_argument_groups: |
| 84 | + query_argument_groups = [[]] |
| 85 | + |
| 86 | + for pos, query_arguments in enumerate(query_argument_groups, 1): |
| 87 | + print("Running query argument group %d of %d..." % |
| 88 | + (pos, len(query_argument_groups))) |
| 89 | + if query_arguments: |
| 90 | + algo.set_query_arguments(*query_arguments) |
| 91 | + descriptor, results = run_individual_query( |
| 92 | + algo, X, distance, count, run_count, search_type) |
| 93 | + # A bit unclear how to set this correctly if we usually load from file |
| 94 | + #descriptor["build_time"] = build_time |
| 95 | + descriptor["index_size"] = index_size |
| 96 | + descriptor["algo"] = definition.algorithm |
| 97 | + descriptor["dataset"] = dataset |
| 98 | + store_results(dataset, count, definition, |
| 99 | + query_arguments, descriptor, results) |
| 100 | + finally: |
| 101 | + algo.done() |
| 102 | + |
| 103 | + |
| 104 | +def run_from_cmdline(): |
| 105 | + parser = argparse.ArgumentParser(''' |
| 106 | +
|
| 107 | + NOTICE: You probably want to run.py rather than this script. |
| 108 | +
|
| 109 | +''') |
| 110 | + parser.add_argument( |
| 111 | + '--dataset', |
| 112 | + choices=DATASETS.keys(), |
| 113 | + help=f'Dataset to benchmark on.', |
| 114 | + required=True) |
| 115 | + parser.add_argument( |
| 116 | + '--algorithm', |
| 117 | + help='Name of algorithm for saving the results.', |
| 118 | + required=True) |
| 119 | + parser.add_argument( |
| 120 | + '--module', |
| 121 | + help='Python module containing algorithm. E.g. "ann_benchmarks.algorithms.annoy"', |
| 122 | + required=True) |
| 123 | + parser.add_argument( |
| 124 | + '--constructor', |
| 125 | + help='Constructer to load from module. E.g. "Annoy"', |
| 126 | + required=True) |
| 127 | + parser.add_argument( |
| 128 | + '--count', |
| 129 | + help='k: Number of nearest neighbours for the algorithm to return.', |
| 130 | + required=True, |
| 131 | + type=int) |
| 132 | + parser.add_argument( |
| 133 | + '--runs', |
| 134 | + help='Number of times to run the algorihm. Will use the fastest run-time over the bunch.', |
| 135 | + required=True, |
| 136 | + type=int) |
| 137 | + parser.add_argument( |
| 138 | + 'build', |
| 139 | + help='JSON of arguments to pass to the constructor. E.g. ["angular", 100]' |
| 140 | + ) |
| 141 | + parser.add_argument( |
| 142 | + 'queries', |
| 143 | + help='JSON of arguments to pass to the queries. E.g. [100]', |
| 144 | + nargs='*', |
| 145 | + default=[]) |
| 146 | + args = parser.parse_args() |
| 147 | + algo_args = json.loads(args.build) |
| 148 | + print(algo_args) |
| 149 | + query_args = [json.loads(q) for q in args.queries] |
| 150 | + |
| 151 | + definition = Definition( |
| 152 | + algorithm=args.algorithm, |
| 153 | + docker_tag=None, # not needed |
| 154 | + module=args.module, |
| 155 | + constructor=args.constructor, |
| 156 | + arguments=algo_args, |
| 157 | + query_argument_groups=query_args, |
| 158 | + disabled=False |
| 159 | + ) |
| 160 | + run(definition, args.dataset, args.count, args.runs) |
| 161 | + |
| 162 | + |
| 163 | +def run_docker(definition, dataset, count, runs, timeout, cpu_limit, |
| 164 | + mem_limit=None): |
| 165 | + cmd = ['--dataset', dataset, |
| 166 | + '--algorithm', definition.algorithm, |
| 167 | + '--module', definition.module, |
| 168 | + '--constructor', definition.constructor, |
| 169 | + '--runs', str(runs), |
| 170 | + '--count', str(count)] |
| 171 | + cmd.append(json.dumps(definition.arguments)) |
| 172 | + cmd += [json.dumps(qag) for qag in definition.query_argument_groups] |
| 173 | + |
| 174 | + client = docker.from_env() |
| 175 | + if mem_limit is None: |
| 176 | + mem_limit = psutil.virtual_memory().available |
| 177 | + |
| 178 | + container = client.containers.run( |
| 179 | + definition.docker_tag, |
| 180 | + cmd, |
| 181 | + volumes={ |
| 182 | + os.path.abspath('benchmark'): |
| 183 | + {'bind': '/home/app/benchmark', 'mode': 'ro'}, |
| 184 | + os.path.abspath('data'): |
| 185 | + {'bind': '/home/app/data', 'mode': 'rw'}, |
| 186 | + os.path.abspath('results'): |
| 187 | + {'bind': '/home/app/results', 'mode': 'rw'}, |
| 188 | + }, |
| 189 | + cpuset_cpus=cpu_limit, |
| 190 | + mem_limit=mem_limit, |
| 191 | + detach=True) |
| 192 | + logger = logging.getLogger(f"annb.{container.short_id}") |
| 193 | + |
| 194 | + logger.info('Created container %s: CPU limit %s, mem limit %s, timeout %d, command %s' % \ |
| 195 | + (container.short_id, cpu_limit, mem_limit, timeout, cmd)) |
| 196 | + |
| 197 | + def stream_logs(): |
| 198 | + for line in container.logs(stream=True): |
| 199 | + logger.info(colors.color(line.decode().rstrip(), fg='blue')) |
| 200 | + |
| 201 | + t = threading.Thread(target=stream_logs, daemon=True) |
| 202 | + t.start() |
| 203 | + |
| 204 | + try: |
| 205 | + exit_code = container.wait(timeout=timeout) |
| 206 | + |
| 207 | + # Exit if exit code |
| 208 | + if exit_code not in [0, None]: |
| 209 | + logger.error(colors.color(container.logs().decode(), fg='red')) |
| 210 | + logger.error('Child process for container %s raised exception %d' % (container.short_id, exit_code)) |
| 211 | + except: |
| 212 | + logger.error('Container.wait for container %s failed with exception' % container.short_id) |
| 213 | + logger.error('Invoked with %s' % cmd) |
| 214 | + traceback.print_exc() |
| 215 | + finally: |
| 216 | + container.remove(force=True) |
0 commit comments