Skip to content

Commit

Permalink
Enabled concurrent graph creation for Lucene engine with index thread…
Browse files Browse the repository at this point in the history
… qty settings

Signed-off-by: Navneet Verma <[email protected]>
  • Loading branch information
navneet1v committed Jan 31, 2025
1 parent 811ae2e commit 3864c69
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 7 deletions.
8 changes: 8 additions & 0 deletions src/main/java/org/opensearch/knn/index/KNNSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -737,6 +737,14 @@ public void onIndexModule(IndexModule module) {
});
}

/**
* Get the index thread quantity setting value from cluster setting.
* @return int
*/
public static int getIndexThreadQty() {
return KNNSettings.state().getSettingValue(KNN_ALGO_PARAM_INDEX_THREAD_QTY);
}

private static String percentageAsString(Integer percentage) {
return percentage + "%";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@
import org.apache.lucene.codecs.lucene99.Lucene99HnswScalarQuantizedVectorsFormat;
import org.apache.lucene.codecs.lucene99.Lucene99HnswVectorsFormat;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.index.SpaceType;
import org.opensearch.knn.index.codec.BasePerFieldKnnVectorsFormat;
import org.opensearch.knn.index.engine.KNNEngine;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Class provides per field format implementation for Lucene Knn vector type
*/
public class KNN9120PerFieldKnnVectorsFormat extends BasePerFieldKnnVectorsFormat {
private static final int NUM_MERGE_WORKERS = 1;

public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperService) {
super(
Expand All @@ -41,23 +43,31 @@ public KNN9120PerFieldKnnVectorsFormat(final Optional<MapperService> mapperServi
knnScalarQuantizedVectorsFormatParams -> new Lucene99HnswScalarQuantizedVectorsFormat(
knnScalarQuantizedVectorsFormatParams.getMaxConnections(),
knnScalarQuantizedVectorsFormatParams.getBeamWidth(),
NUM_MERGE_WORKERS,
getMergeThreadCount(),
knnScalarQuantizedVectorsFormatParams.getBits(),
knnScalarQuantizedVectorsFormatParams.isCompressFlag(),
knnScalarQuantizedVectorsFormatParams.getConfidenceInterval(),
null
getMergeExecutorService()
)
);
}

@Override
/**
* This method returns the maximum dimension allowed from KNNEngine for Lucene codec
*
* @param fieldName Name of the field, ignored
* @return Maximum constant dimension set by KNNEngine
*/
@Override
public int getMaxDimensions(String fieldName) {
return KNNEngine.getMaxDimensionByEngine(KNNEngine.LUCENE);
}

private static int getMergeThreadCount() {
return KNNSettings.getIndexThreadQty();
}

private static ExecutorService getMergeExecutorService() {
return Executors.newFixedThreadPool(getMergeThreadCount());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ private Map<String, Object> getParameters(FieldInfo fieldInfo, VectorDataType ve
maybeAddBinaryPrefixForFaissBWC(knnEngine, parameters, fieldAttributes);

// Used to determine how many threads to use when indexing
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY));
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty());

return parameters;
}
Expand Down Expand Up @@ -258,7 +258,7 @@ private void maybeAddBinaryPrefixForFaissBWC(KNNEngine knnEngine, Map<String, Ob

private Map<String, Object> getTemplateParameters(FieldInfo fieldInfo, Model model) throws IOException {
Map<String, Object> parameters = new HashMap<>();
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY));
parameters.put(KNNConstants.INDEX_THREAD_QTY, KNNSettings.getIndexThreadQty());
parameters.put(KNNConstants.MODEL_ID, fieldInfo.attributes().get(MODEL_ID));
parameters.put(KNNConstants.MODEL_BLOB_PARAMETER, model.getModelBlob());
if (FieldInfoExtractor.extractQuantizationConfig(fieldInfo) != QuantizationConfig.EMPTY) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/knn/training/TrainingJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void run() {
Map<String, Object> trainParameters = libraryIndexingContext.getLibraryParameters();
trainParameters.put(
KNNConstants.INDEX_THREAD_QTY,
KNNSettings.state().getSettingValue(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY)
KNNSettings.getIndexThreadQty()
);

if (libraryIndexingContext.getQuantizationConfig() != QuantizationConfig.EMPTY) {
Expand Down
11 changes: 11 additions & 0 deletions src/test/java/org/opensearch/knn/index/KNNSettingsTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,17 @@ public void testGetFaissAVX2DisabledSettingValueFromConfig_enableSetting_thenVal
assertEquals(expectedKNNFaissAVX2Disabled, actualKNNFaissAVX2Disabled);
}

@SneakyThrows
public void testGetIndexThreadQty_WithDifferentValues_thenSuccess() {
Node mockNode = createMockNode(Map.of(KNNSettings.KNN_ALGO_PARAM_INDEX_THREAD_QTY, 3));
mockNode.start();
ClusterService clusterService = mockNode.injector().getInstance(ClusterService.class);
KNNSettings.state().setClusterService(clusterService);
int threadQty = KNNSettings.getIndexThreadQty();
mockNode.close();
assertEquals(3, threadQty);
}

private Node createMockNode(Map<String, Object> configSettings) throws IOException {
Path configDir = createTempDir();
File configFile = configDir.resolve("opensearch.yml").toFile();
Expand Down

0 comments on commit 3864c69

Please sign in to comment.