Skip to content

SOLR-17160: Time based tracking of core admin requests with Caffeine cache #2304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 94 additions & 43 deletions solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,23 @@
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_EDIT_PERM;
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_READ_PERM;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Expiry;
import com.github.benmanes.caffeine.cache.Ticker;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.api.AnnotatedApi;
import org.apache.solr.api.Api;
import org.apache.solr.api.JerseyResource;
Expand Down Expand Up @@ -427,11 +430,18 @@ default boolean isExpensive() {
}

public static class CoreAdminAsyncTracker {
private static final int MAX_TRACKED_REQUESTS = 100;
/**
* Max number of requests we track in the Caffeine cache. This limit is super high on
* purpose, we're not supposed to hit it. This is just a protection to grow in memory too
* much when receiving an abusive number of admin requests.
*/
private static final int MAX_TRACKED_REQUESTS = 10_000;

public static final String RUNNING = "running";
public static final String COMPLETED = "completed";
public static final String FAILED = "failed";
public final Map<String, Map<String, TaskObject>> requestStatusMap;

private final Cache<String, TaskObject> requestStatusCache;

// Executor for all standard tasks (the ones that are not flagged as expensive)
// We always keep 50 live threads
Expand All @@ -448,25 +458,51 @@ public static class CoreAdminAsyncTracker {
new SolrNamedThreadFactory("parallelCoreAdminAPIExpensiveExecutor"));

public CoreAdminAsyncTracker() {
HashMap<String, Map<String, TaskObject>> map = new HashMap<>(3, 1.0f);
map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<>()));
map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<>()));
requestStatusMap = Collections.unmodifiableMap(map);
this(
Ticker.systemTicker(),
TimeUnit.MINUTES.toNanos(
Long.getLong("solr.admin.requests.running.timeout.minutes", 60L)),
TimeUnit.MINUTES.toNanos(
Long.getLong("solr.admin.requests.completed.timeout.minutes", 5L)));
}

/**
* @param runningTimeoutNanos The time-to-keep for tasks in the RUNNING state.
* @param completedTimeoutNanos The time-to-keep for tasks in the COMPLETED or FAILED state
* after the status was polled.
*/
CoreAdminAsyncTracker(Ticker ticker, long runningTimeoutNanos, long completedTimeoutNanos) {

TaskExpiry expiry = new TaskExpiry(runningTimeoutNanos, completedTimeoutNanos);
requestStatusCache =
Caffeine.newBuilder()
.ticker(ticker)
.maximumSize(MAX_TRACKED_REQUESTS)
.expireAfter(expiry)
.build();
}

public void shutdown() {
ExecutorUtil.shutdownAndAwaitTermination(standardExecutor);
ExecutorUtil.shutdownAndAwaitTermination(expensiveExecutor);
}

public Map<String, TaskObject> getRequestStatusMap(String key) {
return requestStatusMap.get(key);
public TaskObject getAsyncRequestForStatus(String key) {
TaskObject task = requestStatusCache.getIfPresent(key);

if (task != null && !RUNNING.equals(task.status) && !task.polled) {
task.polled = true;
// At the first time we retrieve the status of a completed request, do a second lookup in
// the cache. This is necessary to update the TTL of this request in the cache.
// Unfortunately, we can't force the expiration time to be refreshed without a lookup.
requestStatusCache.getIfPresent(key);
}

return task;
}

public void submitAsyncTask(TaskObject taskObject) throws SolrException {
ensureTaskIdNotInUse(taskObject.taskId);
addTask(RUNNING, taskObject);
addTask(taskObject);

Runnable command =
() -> {
Expand Down Expand Up @@ -497,42 +533,19 @@ public void submitAsyncTask(TaskObject taskObject) throws SolrException {
}
}

/** Helper method to add a task to a tracking type. */
private void addTask(String type, TaskObject o, boolean limit) {
synchronized (getRequestStatusMap(type)) {
if (limit && getRequestStatusMap(type).size() == MAX_TRACKED_REQUESTS) {
String key = getRequestStatusMap(type).entrySet().iterator().next().getKey();
getRequestStatusMap(type).remove(key);
}
addTask(type, o);
}
}

private void addTask(String type, TaskObject o) {
synchronized (getRequestStatusMap(type)) {
getRequestStatusMap(type).put(o.taskId, o);
}
}

/** Helper method to remove a task from a tracking map. */
private void removeTask(String map, String taskId) {
synchronized (getRequestStatusMap(map)) {
getRequestStatusMap(map).remove(taskId);
}
}

private void ensureTaskIdNotInUse(String taskId) throws SolrException {
if (getRequestStatusMap(RUNNING).containsKey(taskId)
|| getRequestStatusMap(COMPLETED).containsKey(taskId)
|| getRequestStatusMap(FAILED).containsKey(taskId)) {
private void addTask(TaskObject taskObject) {
// Ensure task ID is not already in use
if (requestStatusCache.asMap().containsKey(taskObject.taskId)) {
throw new SolrException(
ErrorCode.BAD_REQUEST, "Duplicate request with the same requestid found.");
}

taskObject.status = RUNNING;
requestStatusCache.put(taskObject.taskId, taskObject);
}

private void finishTask(TaskObject taskObject, boolean successful) {
removeTask(RUNNING, taskObject.taskId);
addTask(successful ? COMPLETED : FAILED, taskObject, true);
taskObject.status = successful ? COMPLETED : FAILED;
}

/**
Expand All @@ -546,6 +559,8 @@ public static class TaskObject {
final Callable<SolrQueryResponse> task;
public String rspInfo;
public Object operationRspInfo;
private volatile String status;
private volatile boolean polled;

public TaskObject(
String taskId, String action, boolean expensive, Callable<SolrQueryResponse> task) {
Expand Down Expand Up @@ -574,6 +589,42 @@ public Object getOperationRspObject() {
public void setOperationRspObject(SolrQueryResponse rspObject) {
this.operationRspInfo = rspObject.getResponse();
}

public String getStatus() {
return status;
}
}

/**
* Expiration policy for Caffeine cache. Depending on whether the status of a completed task was
* already retrieved, we return {@link #runningTimeoutNanos} or {@link #completedTimeoutNanos}.
*/
private static class TaskExpiry implements Expiry<String, TaskObject> {

private final long runningTimeoutNanos;
private final long completedTimeoutNanos;

private TaskExpiry(long runningTimeoutNanos, long completedTimeoutNanos) {
this.runningTimeoutNanos = runningTimeoutNanos;
this.completedTimeoutNanos = completedTimeoutNanos;
}

@Override
public long expireAfterCreate(String key, TaskObject task, long currentTime) {
return runningTimeoutNanos;
}

@Override
public long expireAfterUpdate(
String key, TaskObject task, long currentTime, long currentDuration) {
return task.polled ? completedTimeoutNanos : runningTimeoutNanos;
}

@Override
public long expireAfterRead(
String key, TaskObject task, long currentTime, long currentDuration) {
return task.polled ? completedTimeoutNanos : runningTimeoutNanos;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.FAILED;
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.RUNNING;

import jakarta.inject.Inject;
import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi;
import org.apache.solr.client.api.model.GetNodeCommandStatusResponse;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.apache.solr.jersey.PermissionName;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
Expand All @@ -51,25 +51,24 @@ public GetNodeCommandStatus(
public GetNodeCommandStatusResponse getCommandStatus(String requestId) {
ensureRequiredParameterProvided(CoreAdminParams.REQUESTID, requestId);
var requestStatusResponse = new GetNodeCommandStatusResponse();
if (coreAdminAsyncTracker.getRequestStatusMap(RUNNING).containsKey(requestId)) {
requestStatusResponse.status = RUNNING;
} else if (coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).containsKey(requestId)) {
requestStatusResponse.status = COMPLETED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).get(requestId).getRspObject();
requestStatusResponse.response =
coreAdminAsyncTracker
.getRequestStatusMap(COMPLETED)
.get(requestId)
.getOperationRspObject();
} else if (coreAdminAsyncTracker.getRequestStatusMap(FAILED).containsKey(requestId)) {
requestStatusResponse.status = FAILED;
requestStatusResponse.response =
coreAdminAsyncTracker.getRequestStatusMap(FAILED).get(requestId).getRspObject();
} else {

TaskObject taskObject = coreAdminAsyncTracker.getAsyncRequestForStatus(requestId);
String status = taskObject != null ? taskObject.getStatus() : null;

if (status == null) {
requestStatusResponse.status = "notfound";
requestStatusResponse.msg = "No task found in running, completed or failed tasks";
} else {
requestStatusResponse.status = status;

if (taskObject.getStatus().equals(COMPLETED)) {
requestStatusResponse.response = taskObject.getRspObject();
requestStatusResponse.response = taskObject.getOperationRspObject();
Comment on lines +65 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused; you are setting the response twice?

Copy link
Contributor Author

@psalagnac psalagnac Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good catch!
Something is definitely wrong here, but this is existing code. I did not notice that.
I think this deserves a a fix by itself. Values for msg and response in the V2 API response are not consistent depending on the status. And it seems to me they're not consistent with V1 either.
I think this was introduced with #2144.

Maybe I should put this PR on hold and fix this first?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah; didn't know it was existing code. Do in whichever order you want; it doesn't really matter.

} else if (taskObject.getStatus().equals(FAILED)) {
requestStatusResponse.response = taskObject.getRspObject();
}
}

return requestStatusResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.solr.handler.admin;

import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED;

import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
Expand All @@ -24,7 +26,9 @@
import java.nio.file.StandardCopyOption;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.file.PathUtils;
import org.apache.lucene.util.Constants;
import org.apache.solr.SolrTestCaseJ4;
Expand All @@ -43,6 +47,8 @@
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.SolrCore;
import org.apache.solr.embedded.JettySolrRunner;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.apache.solr.response.SolrQueryResponse;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -528,4 +534,39 @@ public void testNonexistentCoreReload() throws Exception {
e.getMessage());
admin.close();
}

@Test
public void testTrackedRequestExpiration() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonderful test

// Create a tracker with controlled clock, relative to 0
AtomicLong clock = new AtomicLong(0L);
CoreAdminAsyncTracker asyncTracker = new CoreAdminAsyncTracker(clock::get, 100L, 10L);
try {
Set<TaskObject> tasks =
Set.of(
new TaskObject("id1", "ACTION", false, SolrQueryResponse::new),
new TaskObject("id2", "ACTION", false, SolrQueryResponse::new));

// Submit all tasks and wait for internal status to be COMPLETED
tasks.forEach(asyncTracker::submitAsyncTask);
while (!tasks.stream().allMatch(t -> COMPLETED.equals(t.getStatus()))) {
Thread.sleep(10L);
}

// Timeout for running tasks is 100n, so status can be retrieved after 20n.
// But timeout for complete tasks is 10n once we polled the status at least once, so status
// is not available anymore 20n later.
clock.set(20);
assertEquals(COMPLETED, asyncTracker.getAsyncRequestForStatus("id1").getStatus());
clock.set(40L);
assertNull(asyncTracker.getAsyncRequestForStatus("id1"));

// Move the clock after the running timeout.
// Status of second task is not available anymore, even if it wasn't retrieved yet
clock.set(110L);
assertNull(asyncTracker.getAsyncRequestForStatus("id2"));

} finally {
asyncTracker.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.util.Map;
import org.apache.solr.SolrTestCase;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.handler.admin.CoreAdminHandler;
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -33,7 +33,6 @@
public class GetNodeCommandStatusTest extends SolrTestCase {
private CoreContainer mockCoreContainer;
private CoreAdminHandler.CoreAdminAsyncTracker mockAsyncTracker;
private CoreAdminHandler.CoreAdminAsyncTracker.TaskObject taskObject;
private GetNodeCommandStatusApi requestNodeCommandApi;

@BeforeClass
Expand All @@ -45,7 +44,6 @@ public static void ensureWorkingMockito() {
public void setupMocks() {
mockCoreContainer = mock(CoreContainer.class);
mockAsyncTracker = mock(CoreAdminHandler.CoreAdminAsyncTracker.class);
taskObject = new CoreAdminHandler.CoreAdminAsyncTracker.TaskObject(null, null, false, null);
requestNodeCommandApi =
new GetNodeCommandStatus(mockCoreContainer, mockAsyncTracker, null, null);
}
Expand Down Expand Up @@ -89,6 +87,9 @@ public void testReturnsStatusOfFailedCommandId() {
}

private void whenTaskExistsWithStatus(String taskId, String status) {
when(mockAsyncTracker.getRequestStatusMap(status)).thenReturn(Map.of(taskId, taskObject));
TaskObject taskObject = mock(TaskObject.class);
when(taskObject.getStatus()).thenReturn(status);

when(mockAsyncTracker.getAsyncRequestForStatus(taskId)).thenReturn(taskObject);
}
}