Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Breaking Changes

### Features
* Added `expandCoverage` parameter to LLM judgment API for hybrid document pooling, improving judgment coverage for Hybrid Optimizer experiments ([#400](https://github.com/opensearch-project/search-relevance/pull/400))
* Introduced dynamic percentile-based relevance thresholding for binary-dependent metrics (Precision, MAP) to replace hard-coded `j > 0` mapping ([#394](https://github.com/opensearch-project/search-relevance/pull/394))

### Enhancements
Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ dependencies {
api "org.opensearch:opensearch:${opensearch_version}"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
compileOnly("org.opensearch:opensearch-neural-search:${opensearch_build}") {
transitive = false
}
implementation group: 'com.google.guava', name: 'guava', version:'33.4.8-jre'
compileOnly group: 'org.apache.commons', name: 'commons-lang3', version: '3.20.0'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.20.0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@
import static org.opensearch.searchrelevance.model.JudgmentCache.DOCUMENT_ID;
import static org.opensearch.searchrelevance.model.JudgmentCache.PROMPT_TEMPLATE_ID;
import static org.opensearch.searchrelevance.model.JudgmentCache.QUERY_TEXT;
import static org.opensearch.searchrelevance.model.JudgmentCache.TIME_STAMP;
import static org.opensearch.searchrelevance.utils.ParserUtils.convertListToSortedStr;

import java.io.IOException;
import java.time.Instant;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.StepListener;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.common.xcontent.XContentFactory;
Expand All @@ -28,20 +29,33 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.reindex.BulkByScrollResponse;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.searchrelevance.exception.SearchRelevanceException;
import org.opensearch.searchrelevance.indices.SearchRelevanceIndicesManager;
import org.opensearch.searchrelevance.model.JudgmentCache;
import org.opensearch.searchrelevance.settings.SearchRelevanceSettingsAccessor;

import lombok.extern.log4j.Log4j2;

@Log4j2
public class JudgmentCacheDao {
private static final Logger LOGGER = LogManager.getLogger(JudgmentCacheDao.class);
private final SearchRelevanceIndicesManager searchRelevanceIndicesManager;
private volatile SearchRelevanceSettingsAccessor settingsAccessor;

public JudgmentCacheDao(SearchRelevanceIndicesManager searchRelevanceIndicesManager) {
this.searchRelevanceIndicesManager = searchRelevanceIndicesManager;
}

/**
* Sets the settings accessor for reading cache TTL configuration.
* Called during plugin initialization after both DAO and settings accessor are created.
*/
public void setSettingsAccessor(SearchRelevanceSettingsAccessor settingsAccessor) {
this.settingsAccessor = settingsAccessor;
}

/**
* Create judgment cache index if not exists
* @param stepListener - step listener for async operation
Expand Down Expand Up @@ -89,14 +103,14 @@ public void upsertJudgmentCache(final JudgmentCache judgmentCache, final ActionL

// Use updateDoc which will create or update the document
searchRelevanceIndicesManager.updateDoc(judgmentCache.id(), content, JUDGMENT_CACHE, ActionListener.wrap(response -> {
LOGGER.debug(
log.debug(
"Successfully upserted judgment cache for queryText: {} and documentId: {}",
judgmentCache.queryText(),
judgmentCache.documentId()
);
listener.onResponse(response);
}, e -> {
LOGGER.error(
log.error(
"Failed to upsert judgment cache for queryText: {} and documentId: {}",
judgmentCache.queryText(),
judgmentCache.documentId(),
Expand All @@ -111,6 +125,53 @@ public void upsertJudgmentCache(final JudgmentCache judgmentCache, final ActionL
}
}

/**
* Cleanup stale cache entries older than the specified TTL.
* This is a fire-and-forget operation — failures are logged but do not block callers.
* @param ttlDays number of days after which cache entries are considered stale
*/
public void cleanupStaleEntries(final long ttlDays) {
long cutoffMillis = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(ttlDays);
String cutoffDate = Instant.ofEpochMilli(cutoffMillis).toString();

log.info("Starting judgment cache cleanup for entries older than {} days (before {})", ttlDays, cutoffDate);

searchRelevanceIndicesManager.deleteByQuery(
QueryBuilders.rangeQuery(TIME_STAMP).lt(cutoffDate),
JUDGMENT_CACHE,
ActionListener.wrap((BulkByScrollResponse response) -> {
long deleted = response.getDeleted();
if (deleted > 0) {
log.info("Judgment cache cleanup completed: deleted {} stale entries older than {} days", deleted, ttlDays);
} else {
log.debug("Judgment cache cleanup completed: no stale entries found older than {} days", ttlDays);
}
}, e -> log.warn("Judgment cache cleanup failed - continuing without cleanup", e))
);
}

/**
* Cleanup stale cache entries based on the configured TTL setting.
* When TTL is disabled (-1, the default), this method is a no-op.
* This is a fire-and-forget operation — failures are logged but do not block callers.
*/
public void cleanupStaleEntries() {
if (settingsAccessor == null) {
log.debug("Settings accessor not set, skipping cache cleanup");
return;
}
long ttlMillis = settingsAccessor.getJudgmentCacheTtl().millis();
if (ttlMillis < 0) {
log.debug("Judgment cache TTL is disabled (-1), skipping cleanup");
return;
}
long ttlDays = TimeUnit.MILLISECONDS.toDays(ttlMillis);
if (ttlDays < 1) {
ttlDays = 1; // minimum 1 day
}
cleanupStaleEntries(ttlDays);
}

/**
* Get judgment cache by queryText and documentId
* @param queryText - queryText to be searched
Expand All @@ -129,7 +190,7 @@ public SearchResponse getJudgmentCache(
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
String contextFieldsStr = contextFields != null ? convertListToSortedStr(contextFields) : "";

LOGGER.debug(
log.debug(
"Building cache search query - queryText: '{}', documentId: '{}', contextFields: '{}', promptTemplateCode: '{}'",
queryText,
documentId,
Expand Down Expand Up @@ -157,7 +218,7 @@ public SearchResponse getJudgmentCache(
}
listener.onResponse(response);
}, e -> {
LOGGER.debug("Cache lookup failed for docId: {} - continuing without cache", documentId);
log.debug("Cache lookup failed for docId: {} - continuing without cache", documentId);
listener.onFailure(e);
});

Expand Down
Loading
Loading