Skip to content

Commit 338fbda

Browse files
psalagnacdsmiley
andcommitted
SOLR-17160: Core admin async ID status, 10k limit and time expire (#2304)
Core Admin "async" request status tracking is no longer capped at 100; it's 10k. Statuses are now removed 5 minutes after the read of a completed/failed status. Helps collection async backup/restore and other operations scale to 100+ shards. Co-authored-by: David Smiley <dsmiley@salesforce.com> (cherry picked from commit d3b4c2e)
1 parent d684934 commit 338fbda

File tree

6 files changed

+214
-65
lines changed

6 files changed

+214
-65
lines changed

solr/CHANGES.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,10 @@ Improvements
4848

4949
* SOLR-16198: Introduce tabbed sections again in the Ref Guide. (Christos Malliaridis via Eric Pugh)
5050

51+
* SOLR-17160: Core Admin "async" request status tracking is no longer capped at 100; it's 10k.
52+
Statuses are now removed 5 minutes after the read of a completed/failed status. Helps collection
53+
async backup/restore and other operations scale to 100+ shards. (Pierre Salagnac, David Smiley)
54+
5155
Optimizations
5256
---------------------
5357
* SOLR-17257: Both Minimize Cores and the Affinity replica placement strategies would over-gather

solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java

Lines changed: 109 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,23 @@
2121
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_EDIT_PERM;
2222
import static org.apache.solr.security.PermissionNameProvider.Name.CORE_READ_PERM;
2323

24+
import com.github.benmanes.caffeine.cache.Cache;
25+
import com.github.benmanes.caffeine.cache.Caffeine;
26+
import com.github.benmanes.caffeine.cache.Expiry;
27+
import com.github.benmanes.caffeine.cache.Ticker;
2428
import java.io.File;
2529
import java.lang.invoke.MethodHandles;
2630
import java.util.ArrayList;
2731
import java.util.Collection;
28-
import java.util.Collections;
2932
import java.util.HashMap;
3033
import java.util.Iterator;
31-
import java.util.LinkedHashMap;
3234
import java.util.List;
3335
import java.util.Locale;
3436
import java.util.Map;
3537
import java.util.Map.Entry;
3638
import java.util.concurrent.Callable;
3739
import java.util.concurrent.ExecutorService;
40+
import java.util.concurrent.TimeUnit;
3841
import org.apache.solr.api.AnnotatedApi;
3942
import org.apache.solr.api.Api;
4043
import org.apache.solr.api.JerseyResource;
@@ -47,6 +50,7 @@
4750
import org.apache.solr.common.params.CoreAdminParams;
4851
import org.apache.solr.common.params.ModifiableSolrParams;
4952
import org.apache.solr.common.params.SolrParams;
53+
import org.apache.solr.common.util.EnvUtils;
5054
import org.apache.solr.common.util.ExecutorUtil;
5155
import org.apache.solr.common.util.NamedList;
5256
import org.apache.solr.common.util.SolrNamedThreadFactory;
@@ -427,11 +431,19 @@ default boolean isExpensive() {
427431
}
428432

429433
public static class CoreAdminAsyncTracker {
430-
private static final int MAX_TRACKED_REQUESTS = 100;
434+
/**
435+
* Max number of requests we track in the Caffeine cache. This limit is super high on purpose,
436+
* we're not supposed to hit it. This is just a protection to grow in memory too much when
437+
* receiving an abusive number of admin requests.
438+
*/
439+
private static final int MAX_TRACKED_REQUESTS =
440+
EnvUtils.getPropertyAsInteger("solr.admin.async.max", 10_000);
441+
431442
public static final String RUNNING = "running";
432443
public static final String COMPLETED = "completed";
433444
public static final String FAILED = "failed";
434-
public final Map<String, Map<String, TaskObject>> requestStatusMap;
445+
446+
private final Cache<String, TaskObject> requestStatusCache; // key by ID
435447

436448
// Executor for all standard tasks (the ones that are not flagged as expensive)
437449
// We always keep 50 live threads
@@ -440,33 +452,59 @@ public static class CoreAdminAsyncTracker {
440452
50, new SolrNamedThreadFactory("parallelCoreAdminAPIBaseExecutor"));
441453

442454
// Executor for expensive tasks
443-
// We keep the number number of max threads very low to have throttling for expensive tasks
455+
// We keep the number of max threads very low to have throttling for expensive tasks
444456
private ExecutorService expensiveExecutor =
445457
ExecutorUtil.newMDCAwareCachedThreadPool(
446458
5,
447459
Integer.MAX_VALUE,
448460
new SolrNamedThreadFactory("parallelCoreAdminAPIExpensiveExecutor"));
449461

450462
public CoreAdminAsyncTracker() {
451-
HashMap<String, Map<String, TaskObject>> map = new HashMap<>(3, 1.0f);
452-
map.put(RUNNING, Collections.synchronizedMap(new LinkedHashMap<>()));
453-
map.put(COMPLETED, Collections.synchronizedMap(new LinkedHashMap<>()));
454-
map.put(FAILED, Collections.synchronizedMap(new LinkedHashMap<>()));
455-
requestStatusMap = Collections.unmodifiableMap(map);
463+
this(
464+
Ticker.systemTicker(),
465+
TimeUnit.MINUTES.toNanos(
466+
EnvUtils.getPropertyAsLong("solr.admin.async.timeout.minutes", 60L)),
467+
TimeUnit.MINUTES.toNanos(
468+
EnvUtils.getPropertyAsLong("solr.admin.async.timeout.completed.minutes", 5L)));
469+
}
470+
471+
/**
472+
* @param runningTimeoutNanos The time-to-keep for tasks in the RUNNING state.
473+
* @param completedTimeoutNanos The time-to-keep for tasks in the COMPLETED or FAILED state
474+
* after the status was polled.
475+
*/
476+
CoreAdminAsyncTracker(Ticker ticker, long runningTimeoutNanos, long completedTimeoutNanos) {
477+
478+
TaskExpiry expiry = new TaskExpiry(runningTimeoutNanos, completedTimeoutNanos);
479+
requestStatusCache =
480+
Caffeine.newBuilder()
481+
.ticker(ticker)
482+
.maximumSize(MAX_TRACKED_REQUESTS)
483+
.expireAfter(expiry)
484+
.build();
456485
}
457486

458487
public void shutdown() {
459488
ExecutorUtil.shutdownAndAwaitTermination(standardExecutor);
460489
ExecutorUtil.shutdownAndAwaitTermination(expensiveExecutor);
461490
}
462491

463-
public Map<String, TaskObject> getRequestStatusMap(String key) {
464-
return requestStatusMap.get(key);
492+
public TaskObject getAsyncRequestForStatus(String key) {
493+
TaskObject task = requestStatusCache.getIfPresent(key);
494+
495+
if (task != null && !RUNNING.equals(task.status) && !task.polledAfterCompletion) {
496+
task.polledAfterCompletion = true;
497+
// At the first time we retrieve the status of a completed request, do a second lookup in
498+
// the cache. This is necessary to update the TTL of this request in the cache.
499+
// Unfortunately, we can't force the expiration time to be refreshed without a lookup.
500+
requestStatusCache.getIfPresent(key);
501+
}
502+
503+
return task;
465504
}
466505

467506
public void submitAsyncTask(TaskObject taskObject) throws SolrException {
468-
ensureTaskIdNotInUse(taskObject.taskId);
469-
addTask(RUNNING, taskObject);
507+
addTask(taskObject);
470508

471509
Runnable command =
472510
() -> {
@@ -497,42 +535,26 @@ public void submitAsyncTask(TaskObject taskObject) throws SolrException {
497535
}
498536
}
499537

500-
/** Helper method to add a task to a tracking type. */
501-
private void addTask(String type, TaskObject o, boolean limit) {
502-
synchronized (getRequestStatusMap(type)) {
503-
if (limit && getRequestStatusMap(type).size() == MAX_TRACKED_REQUESTS) {
504-
String key = getRequestStatusMap(type).entrySet().iterator().next().getKey();
505-
getRequestStatusMap(type).remove(key);
506-
}
507-
addTask(type, o);
508-
}
509-
}
510-
511-
private void addTask(String type, TaskObject o) {
512-
synchronized (getRequestStatusMap(type)) {
513-
getRequestStatusMap(type).put(o.taskId, o);
514-
}
515-
}
516-
517-
/** Helper method to remove a task from a tracking map. */
518-
private void removeTask(String map, String taskId) {
519-
synchronized (getRequestStatusMap(map)) {
520-
getRequestStatusMap(map).remove(taskId);
521-
}
522-
}
523-
524-
private void ensureTaskIdNotInUse(String taskId) throws SolrException {
525-
if (getRequestStatusMap(RUNNING).containsKey(taskId)
526-
|| getRequestStatusMap(COMPLETED).containsKey(taskId)
527-
|| getRequestStatusMap(FAILED).containsKey(taskId)) {
538+
private void addTask(TaskObject taskObject) {
539+
// Ensure task ID is not already in use
540+
TaskObject taskInCache =
541+
requestStatusCache.get(
542+
taskObject.taskId,
543+
n -> {
544+
taskObject.status = RUNNING;
545+
return taskObject;
546+
});
547+
548+
// If we get a different task instance, it means one was already in the cache with the
549+
// same name. Just reject the new one.
550+
if (taskInCache != taskObject) {
528551
throw new SolrException(
529552
ErrorCode.BAD_REQUEST, "Duplicate request with the same requestid found.");
530553
}
531554
}
532555

533556
private void finishTask(TaskObject taskObject, boolean successful) {
534-
removeTask(RUNNING, taskObject.taskId);
535-
addTask(successful ? COMPLETED : FAILED, taskObject, true);
557+
taskObject.status = successful ? COMPLETED : FAILED;
536558
}
537559

538560
/**
@@ -546,6 +568,13 @@ public static class TaskObject {
546568
final Callable<SolrQueryResponse> task;
547569
public String rspInfo;
548570
public Object operationRspInfo;
571+
private volatile String status;
572+
573+
/**
574+
* Flag set to true once the task is complete (can be in error) and the status was polled
575+
* already once. Once set, the time we keep the task status is shortened.
576+
*/
577+
private volatile boolean polledAfterCompletion;
549578

550579
public TaskObject(
551580
String taskId, String action, boolean expensive, Callable<SolrQueryResponse> task) {
@@ -574,6 +603,42 @@ public Object getOperationRspObject() {
574603
public void setOperationRspObject(SolrQueryResponse rspObject) {
575604
this.operationRspInfo = rspObject.getResponse();
576605
}
606+
607+
public String getStatus() {
608+
return status;
609+
}
610+
}
611+
612+
/**
613+
* Expiration policy for Caffeine cache. Depending on whether the status of a completed task was
614+
* already retrieved, we return {@link #runningTimeoutNanos} or {@link #completedTimeoutNanos}.
615+
*/
616+
private static class TaskExpiry implements Expiry<String, TaskObject> {
617+
618+
private final long runningTimeoutNanos;
619+
private final long completedTimeoutNanos;
620+
621+
private TaskExpiry(long runningTimeoutNanos, long completedTimeoutNanos) {
622+
this.runningTimeoutNanos = runningTimeoutNanos;
623+
this.completedTimeoutNanos = completedTimeoutNanos;
624+
}
625+
626+
@Override
627+
public long expireAfterCreate(String key, TaskObject task, long currentTime) {
628+
return runningTimeoutNanos;
629+
}
630+
631+
@Override
632+
public long expireAfterUpdate(
633+
String key, TaskObject task, long currentTime, long currentDuration) {
634+
return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos;
635+
}
636+
637+
@Override
638+
public long expireAfterRead(
639+
String key, TaskObject task, long currentTime, long currentDuration) {
640+
return task.polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos;
641+
}
577642
}
578643
}
579644
}

solr/core/src/java/org/apache/solr/handler/admin/api/GetNodeCommandStatus.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,14 @@
1818

1919
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.COMPLETED;
2020
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.FAILED;
21-
import static org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.RUNNING;
2221

2322
import jakarta.inject.Inject;
2423
import org.apache.solr.client.api.endpoint.GetNodeCommandStatusApi;
2524
import org.apache.solr.client.api.model.GetNodeCommandStatusResponse;
2625
import org.apache.solr.common.params.CoreAdminParams;
2726
import org.apache.solr.core.CoreContainer;
2827
import org.apache.solr.handler.admin.CoreAdminHandler;
28+
import org.apache.solr.handler.admin.CoreAdminHandler.CoreAdminAsyncTracker.TaskObject;
2929
import org.apache.solr.jersey.PermissionName;
3030
import org.apache.solr.request.SolrQueryRequest;
3131
import org.apache.solr.response.SolrQueryResponse;
@@ -51,25 +51,24 @@ public GetNodeCommandStatus(
5151
public GetNodeCommandStatusResponse getCommandStatus(String requestId) {
5252
ensureRequiredParameterProvided(CoreAdminParams.REQUESTID, requestId);
5353
var requestStatusResponse = new GetNodeCommandStatusResponse();
54-
if (coreAdminAsyncTracker.getRequestStatusMap(RUNNING).containsKey(requestId)) {
55-
requestStatusResponse.status = RUNNING;
56-
} else if (coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).containsKey(requestId)) {
57-
requestStatusResponse.status = COMPLETED;
58-
requestStatusResponse.response =
59-
coreAdminAsyncTracker.getRequestStatusMap(COMPLETED).get(requestId).getRspObject();
60-
requestStatusResponse.response =
61-
coreAdminAsyncTracker
62-
.getRequestStatusMap(COMPLETED)
63-
.get(requestId)
64-
.getOperationRspObject();
65-
} else if (coreAdminAsyncTracker.getRequestStatusMap(FAILED).containsKey(requestId)) {
66-
requestStatusResponse.status = FAILED;
67-
requestStatusResponse.response =
68-
coreAdminAsyncTracker.getRequestStatusMap(FAILED).get(requestId).getRspObject();
69-
} else {
54+
55+
TaskObject taskObject = coreAdminAsyncTracker.getAsyncRequestForStatus(requestId);
56+
String status = taskObject != null ? taskObject.getStatus() : null;
57+
58+
if (status == null) {
7059
requestStatusResponse.status = "notfound";
7160
requestStatusResponse.msg = "No task found in running, completed or failed tasks";
61+
} else {
62+
requestStatusResponse.status = status;
63+
64+
if (status.equals(COMPLETED)) {
65+
requestStatusResponse.response = taskObject.getRspObject();
66+
requestStatusResponse.response = taskObject.getOperationRspObject();
67+
} else if (status.equals(FAILED)) {
68+
requestStatusResponse.response = taskObject.getRspObject();
69+
}
7270
}
71+
7372
return requestStatusResponse;
7473
}
7574
}

0 commit comments

Comments
 (0)