Skip to content

Commit

Permalink
Enabled concurrent graph creation for Lucene engine with index thread…
Browse files Browse the repository at this point in the history
… qty settings

Signed-off-by: Navneet Verma <[email protected]>
  • Loading branch information
navneet1v committed Feb 1, 2025
1 parent 811ae2e commit fabd164
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 20 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove DocsWithFieldSet reference from NativeEngineFieldVectorsWriter (#2408)[https://github.com/opensearch-project/k-NN/pull/2408]
- Remove skip building graph check for quantization use case (#2430)[https://github.com/opensearch-project/k-NN/2430]
- Removing redundant type conversions for script scoring for hamming space with binary vectors (#2351)[https://github.com/opensearch-project/k-NN/pull/2351]
- Update default to 0 to always build graph as default behavior (#52)[https://github.com/opensearch-project/k-NN/pull/2452]
- Update default to 0 to always build graph as default behavior (#2452)[https://github.com/opensearch-project/k-NN/pull/2452]
- Enabled concurrent graph creation for Lucene engine with index thread qty settings(#2480)[https://github.com/opensearch-project/k-NN/pull/2480]
### Bug Fixes
* Fixing the bug when a segment has no vector field present for disk based vector search (#2282)[https://github.com/opensearch-project/k-NN/pull/2282]
* Fixing the bug where search fails with "fields" parameter for an index with a knn_vector field (#2314)[https://github.com/opensearch-project/k-NN/pull/2314]
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,14 @@ public void onIndexModule(IndexModule module) {
});
}

/**
* Get the index thread quantity setting value from cluster setting.
* @return int
*/
public static int getIndexThreadQty() {
return KNNSettings.state().getSettingValue(KNN_ALGO_PARAM_INDEX_THREAD_QTY);
}

private static String percentageAsString(Integer percentage) {
return percentage + "%";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@

import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.codec.BasePerFieldKnnVectorsFormat;
import org.opensearch.knn.index.engine.KNNEngine;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Class provides per field format implementation for Lucene Knn vector type
*/
public class KNN9120PerFieldKnnVectorsFormat extends BasePerFieldKnnVectorsFormat {
private static final int NUM_MERGE_WORKERS = 1;
private static final Tuple<Integer, ExecutorService> DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE = Tuple.tuple(1, null);

public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperService) {
super(
Expand All @@ -27,37 +31,67 @@ public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperServi
Lucene99HnswVectorsFormat.DEFAULT_BEAM_WIDTH,
Lucene99HnswVectorsFormat::new,
knnVectorsFormatParams -> {
final Tuple<Integer, ExecutorService> mergeThreadCountAndExecutorService = getMergeThreadCountAndExecutorService();
// There is an assumption here that hamming space will only be used for binary vectors. This will need to be fixed if that
// changes in the future.
if (knnVectorsFormatParams.getSpaceType() == SpaceType.HAMMING) {
return new KNN9120HnswBinaryVectorsFormat(
knnVectorsFormatParams.getMaxConnections(),
knnVectorsFormatParams.getBeamWidth()
knnVectorsFormatParams.getBeamWidth(),
// number of merge threads
mergeThreadCountAndExecutorService.v1(),
// executor service
mergeThreadCountAndExecutorService.v2()
);
} else {
return new Lucene99HnswVectorsFormat(knnVectorsFormatParams.getMaxConnections(), knnVectorsFormatParams.getBeamWidth());
return new Lucene99HnswVectorsFormat(
knnVectorsFormatParams.getMaxConnections(),
knnVectorsFormatParams.getBeamWidth(),
// number of merge threads
mergeThreadCountAndExecutorService.v1(),
// executor service
mergeThreadCountAndExecutorService.v2()
);
}
},
knnScalarQuantizedVectorsFormatParams -> new Lucene99HnswScalarQuantizedVectorsFormat(
knnScalarQuantizedVectorsFormatParams.getMaxConnections(),
knnScalarQuantizedVectorsFormatParams.getBeamWidth(),
NUM_MERGE_WORKERS,
knnScalarQuantizedVectorsFormatParams.getBits(),
knnScalarQuantizedVectorsFormatParams.isCompressFlag(),
knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(),
null
)
knnScalarQuantizedVectorsFormatParams -> {
final Tuple<Integer, ExecutorService> mergeThreadCountAndExecutorService = getMergeThreadCountAndExecutorService();
return new Lucene99HnswScalarQuantizedVectorsFormat(
knnScalarQuantizedVectorsFormatParams.getMaxConnections(),
knnScalarQuantizedVectorsFormatParams.getBeamWidth(),
// Number of merge threads
mergeThreadCountAndExecutorService.v1(),
knnScalarQuantizedVectorsFormatParams.getBits(),
knnScalarQuantizedVectorsFormatParams.isCompressFlag(),
knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(),
// Executor service
mergeThreadCountAndExecutorService.v2()
);
}
);
}

@Override
/**
* This method returns the maximum dimension allowed from KNNEngine for Lucene codec
*
* @param fieldName Name of the field, ignored
* @return Maximum constant dimension set by KNNEngine
*/
@Override
public int getMaxDimensions(String fieldName) {
return KNNEngine.getMaxDimensionByEngine(KNNEngine.LUCENE);
}

private static Tuple<Integer, ExecutorService> getMergeThreadCountAndExecutorService() {
// To ensure that only once we are fetching the settings per segment, we are fetching the num threads once while
// creating the executors
int mergeThreadCount = KNNSettings.getIndexThreadQty();
// We need to return null whenever the merge threads are <=1, as lucene assumes that if number of threads are 1
// then we should be giving a null value of the executor
if (mergeThreadCount <= 1) {
return DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE;
} else {
return Tuple.tuple(mergeThreadCount, Executors.newFixedThreadPool(mergeThreadCount));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private Map<String, Object> getParameters(FieldInfo fieldInfo, VectorDataType ve
maybeAddBinaryPrefixForFaissBWC(knnEngine, parameters, fieldAttributes);

// Used to determine how many threads to use when indexing
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY));
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty());

return parameters;
}
Expand Down Expand Up @@ -258,7 +258,7 @@ private void maybeAddBinaryPrefixForFaissBWC(KNNEngine knnEngine, Map<String, Ob

private Map<String, Object> getTemplateParameters(FieldInfo fieldInfo, Model model) throws IOException {
Map<String, Object> parameters = new HashMap<>();
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY));
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty());
parameters.put(KNNConstants.MODEL_ID, fieldInfo.attributes().get(MODEL_ID));
parameters.put(KNNConstants.MODEL_BLOB_PARAMETER, model.getModelBlob());
if (FieldInfoExtractor.extractQuantizationConfig(fieldInfo) != QuantizationConfig.EMPTY) {
Expand Down
5 changes: 1 addition & 4 deletions src/main/java/org/opensearch/knn/training/TrainingJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,7 @@ public void run() {
.getKNNLibraryIndexingContext(knnMethodContext, knnMethodConfigContext);

Map<String, Object> trainParameters = libraryIndexingContext.getLibraryParameters();
trainParameters.put(
KNNConstants.INDEX_THREAD_QTY,
KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY)
);
trainParameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty());

if (libraryIndexingContext.getQuantizationConfig() != QuantizationConfig.EMPTY) {
trainParameters.put(KNNConstants.VECTOR_DATA_TYPE_FIELD, VectorDataType.BINARY.getValue());
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/org/opensearch/knn/index/KNNSettingsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,17 @@ public void testGetFaissAVX2DisabledSettingValueFromConfig_enableSetting_thenVal
assertEquals(expectedKNNFaissAVX2Disabled, actualKNNFaissAVX2Disabled);
}

@SneakyThrows
public void testGetIndexThreadQty_WithDifferentValues_thenSuccess() {
Node mockNode = createMockNode(Map.of(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY, 3));
mockNode.start();
ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class);
KNNSettings.state().setClusterService(clusterService);
int threadQty = KNNSettings.getIndexThreadQty();
mockNode.close();
assertEquals(3, threadQty);
}

private Node createMockNode(Map<String, Object> configSettings) throws IOException {
Path configDir = createTempDir();
File configFile = configDir.resolve("opensearch.yml").toFile();
Expand Down

0 comments on commit fabd164

Please sign in to comment.