Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.19] Enabled concurrent graph creation for Lucene engine with index thread qty settings #2488

Merged
merged 1 commit into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove DocsWithFieldSet reference from NativeEngineFieldVectorsWriter (#2408)[https://github.com/opensearch-project/k-NN/pull/2408]
- Remove skip building graph check for quantization use case (#2430)[https://github.com/opensearch-project/k-NN/2430]
- Removing redundant type conversions for script scoring for hamming space with binary vectors (#2351)[https://github.com/opensearch-project/k-NN/pull/2351]
- Update default to 0 to always build graph as default behavior (#52)[https://github.com/opensearch-project/k-NN/pull/2452]
- Update default to 0 to always build graph as default behavior (#2452)[https://github.com/opensearch-project/k-NN/pull/2452]
- Enabled concurrent graph creation for Lucene engine with index thread qty settings(#2480)[https://github.com/opensearch-project/k-NN/pull/2480]
### Bug Fixes
* Fixing the bug when a segment has no vector field present for disk based vector search (#2282)[https://github.com/opensearch-project/k-NN/pull/2282]
* Fixing the bug where search fails with "fields" parameter for an index with a knn_vector field (#2314)[https://github.com/opensearch-project/k-NN/pull/2314]
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,14 @@ public void onIndexModule(IndexModule module) {
});
}

/**
* Get the index thread quantity setting value from cluster setting.
* @return int
*/
public static int getIndexThreadQty() {
return KNNSettings.state().getSettingValue(KNN_ALGO_PARAM_INDEX_THREAD_QTY);
}

private static String percentageAsString(Integer percentage) {
return percentage + "%";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,22 @@

import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat;
import org.opensearch.common.collect.Tuple;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.codec.BasePerFieldKnnVectorsFormat;
import org.opensearch.knn.index.engine.KNNEngine;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Class provides per field format implementation for Lucene Knn vector type
*/
public class KNN9120PerFieldKnnVectorsFormat extends BasePerFieldKnnVectorsFormat {
private static final int NUM_MERGE_WORKERS = 1;
private static final Tuple<Integer, ExecutorService> DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE = Tuple.tuple(1, null);

public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperService) {
super(
Expand All @@ -27,37 +31,67 @@ public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperServi
Lucene99HnswVectorsFormat.DEFAULT_BEAM_WIDTH,
Lucene99HnswVectorsFormat::new,
knnVectorsFormatParams -> {
final Tuple<Integer, ExecutorService> mergeThreadCountAndExecutorService = getMergeThreadCountAndExecutorService();
// There is an assumption here that hamming space will only be used for binary vectors. This will need to be fixed if that
// changes in the future.
if (knnVectorsFormatParams.getSpaceType() == SpaceType.HAMMING) {
return new KNN9120HnswBinaryVectorsFormat(
knnVectorsFormatParams.getMaxConnections(),
knnVectorsFormatParams.getBeamWidth()
knnVectorsFormatParams.getBeamWidth(),
// number of merge threads
mergeThreadCountAndExecutorService.v1(),
// executor service
mergeThreadCountAndExecutorService.v2()
);
} else {
return new Lucene99HnswVectorsFormat(knnVectorsFormatParams.getMaxConnections(), knnVectorsFormatParams.getBeamWidth());
return new Lucene99HnswVectorsFormat(
knnVectorsFormatParams.getMaxConnections(),
knnVectorsFormatParams.getBeamWidth(),
// number of merge threads
mergeThreadCountAndExecutorService.v1(),
// executor service
mergeThreadCountAndExecutorService.v2()
);
}
},
knnScalarQuantizedVectorsFormatParams -> new Lucene99HnswScalarQuantizedVectorsFormat(
knnScalarQuantizedVectorsFormatParams.getMaxConnections(),
knnScalarQuantizedVectorsFormatParams.getBeamWidth(),
NUM_MERGE_WORKERS,
knnScalarQuantizedVectorsFormatParams.getBits(),
knnScalarQuantizedVectorsFormatParams.isCompressFlag(),
knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(),
null
)
knnScalarQuantizedVectorsFormatParams -> {
final Tuple<Integer, ExecutorService> mergeThreadCountAndExecutorService = getMergeThreadCountAndExecutorService();
return new Lucene99HnswScalarQuantizedVectorsFormat(
knnScalarQuantizedVectorsFormatParams.getMaxConnections(),
knnScalarQuantizedVectorsFormatParams.getBeamWidth(),
// Number of merge threads
mergeThreadCountAndExecutorService.v1(),
knnScalarQuantizedVectorsFormatParams.getBits(),
knnScalarQuantizedVectorsFormatParams.isCompressFlag(),
knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(),
// Executor service
mergeThreadCountAndExecutorService.v2()
);
}
);
}

@Override
/**
* This method returns the maximum dimension allowed from KNNEngine for Lucene codec
*
* @param fieldName Name of the field, ignored
* @return Maximum constant dimension set by KNNEngine
*/
@Override
public int getMaxDimensions(String fieldName) {
return KNNEngine.getMaxDimensionByEngine(KNNEngine.LUCENE);
}

private static Tuple<Integer, ExecutorService> getMergeThreadCountAndExecutorService() {
// To ensure that only once we are fetching the settings per segment, we are fetching the num threads once while
// creating the executors
int mergeThreadCount = KNNSettings.getIndexThreadQty();
// We need to return null whenever the merge threads are <=1, as lucene assumes that if number of threads are 1
// then we should be giving a null value of the executor
if (mergeThreadCount <= 1) {
return DEFAULT_MERGE_THREAD_COUNT_AND_EXECUTOR_SERVICE;
} else {
return Tuple.tuple(mergeThreadCount, Executors.newFixedThreadPool(mergeThreadCount));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private Map<String, Object> getParameters(FieldInfo fieldInfo, VectorDataType ve
maybeAddBinaryPrefixForFaissBWC(knnEngine, parameters, fieldAttributes);

// Used to determine how many threads to use when indexing
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY));
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty());

return parameters;
}
Expand Down Expand Up @@ -258,7 +258,7 @@ private void maybeAddBinaryPrefixForFaissBWC(KNNEngine knnEngine, Map<String, Ob

private Map<String, Object> getTemplateParameters(FieldInfo fieldInfo, Model model) throws IOException {
Map<String, Object> parameters = new HashMap<>();
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY));
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty());
parameters.put(KNNConstants.MODEL_ID, fieldInfo.attributes().get(MODEL_ID));
parameters.put(KNNConstants.MODEL_BLOB_PARAMETER, model.getModelBlob());
if (FieldInfoExtractor.extractQuantizationConfig(fieldInfo) != QuantizationConfig.EMPTY) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,7 @@ public void run() {
.getKNNLibraryIndexingContext(knnMethodContext, knnMethodConfigContext);

Map<String, Object> trainParameters = libraryIndexingContext.getLibraryParameters();
trainParameters.put(
KNNConstants.INDEX_THREAD_QTY,
KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY)
);
trainParameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty());

if (libraryIndexingContext.getQuantizationConfig() != QuantizationConfig.EMPTY) {
trainParameters.put(KNNConstants.VECTOR_DATA_TYPE_FIELD, VectorDataType.BINARY.getValue());
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/org/opensearch/knn/index/KNNSettingsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,17 @@ public void testGetFaissAVX2DisabledSettingValueFromConfig_enableSetting_thenVal
assertEquals(expectedKNNFaissAVX2Disabled, actualKNNFaissAVX2Disabled);
}

@SneakyThrows
public void testGetIndexThreadQty_WithDifferentValues_thenSuccess() {
Node mockNode = createMockNode(Map.of(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY, 3));
mockNode.start();
ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class);
KNNSettings.state().setClusterService(clusterService);
int threadQty = KNNSettings.getIndexThreadQty();
mockNode.close();
assertEquals(3, threadQty);
}

private Node createMockNode(Map<String, Object> configSettings) throws IOException {
Path configDir = createTempDir();
File configFile = configDir.resolve("opensearch.yml").toFile();
Expand Down
Loading