2121import static org .apache .solr .security .PermissionNameProvider .Name .CORE_EDIT_PERM ;
2222import static org .apache .solr .security .PermissionNameProvider .Name .CORE_READ_PERM ;
2323
24+ import com .github .benmanes .caffeine .cache .Cache ;
25+ import com .github .benmanes .caffeine .cache .Caffeine ;
26+ import com .github .benmanes .caffeine .cache .Expiry ;
27+ import com .github .benmanes .caffeine .cache .Ticker ;
2428import java .io .File ;
2529import java .lang .invoke .MethodHandles ;
2630import java .util .ArrayList ;
2731import java .util .Collection ;
28- import java .util .Collections ;
2932import java .util .HashMap ;
3033import java .util .Iterator ;
31- import java .util .LinkedHashMap ;
3234import java .util .List ;
3335import java .util .Locale ;
3436import java .util .Map ;
3537import java .util .Map .Entry ;
3638import java .util .concurrent .Callable ;
3739import java .util .concurrent .ExecutorService ;
40+ import java .util .concurrent .TimeUnit ;
3841import org .apache .solr .api .AnnotatedApi ;
3942import org .apache .solr .api .Api ;
4043import org .apache .solr .api .JerseyResource ;
4750import org .apache .solr .common .params .CoreAdminParams ;
4851import org .apache .solr .common .params .ModifiableSolrParams ;
4952import org .apache .solr .common .params .SolrParams ;
53+ import org .apache .solr .common .util .EnvUtils ;
5054import org .apache .solr .common .util .ExecutorUtil ;
5155import org .apache .solr .common .util .NamedList ;
5256import org .apache .solr .common .util .SolrNamedThreadFactory ;
@@ -427,11 +431,19 @@ default boolean isExpensive() {
427431 }
428432
429433 public static class CoreAdminAsyncTracker {
430- private static final int MAX_TRACKED_REQUESTS = 100 ;
434+ /**
435+ * Max number of requests we track in the Caffeine cache. This limit is super high on purpose,
436+ * we're not supposed to hit it. This is just a protection to grow in memory too much when
437+ * receiving an abusive number of admin requests.
438+ */
439+ private static final int MAX_TRACKED_REQUESTS =
440+ EnvUtils .getPropertyAsInteger ("solr.admin.async.max" , 10_000 );
441+
431442 public static final String RUNNING = "running" ;
432443 public static final String COMPLETED = "completed" ;
433444 public static final String FAILED = "failed" ;
434- public final Map <String , Map <String , TaskObject >> requestStatusMap ;
445+
446+ private final Cache <String , TaskObject > requestStatusCache ; // key by ID
435447
436448 // Executor for all standard tasks (the ones that are not flagged as expensive)
437449 // We always keep 50 live threads
@@ -440,33 +452,59 @@ public static class CoreAdminAsyncTracker {
440452 50 , new SolrNamedThreadFactory ("parallelCoreAdminAPIBaseExecutor" ));
441453
442454 // Executor for expensive tasks
443- // We keep the number number of max threads very low to have throttling for expensive tasks
455+ // We keep the number of max threads very low to have throttling for expensive tasks
444456 private ExecutorService expensiveExecutor =
445457 ExecutorUtil .newMDCAwareCachedThreadPool (
446458 5 ,
447459 Integer .MAX_VALUE ,
448460 new SolrNamedThreadFactory ("parallelCoreAdminAPIExpensiveExecutor" ));
449461
450462 public CoreAdminAsyncTracker () {
451- HashMap <String , Map <String , TaskObject >> map = new HashMap <>(3 , 1.0f );
452- map .put (RUNNING , Collections .synchronizedMap (new LinkedHashMap <>()));
453- map .put (COMPLETED , Collections .synchronizedMap (new LinkedHashMap <>()));
454- map .put (FAILED , Collections .synchronizedMap (new LinkedHashMap <>()));
455- requestStatusMap = Collections .unmodifiableMap (map );
463+ this (
464+ Ticker .systemTicker (),
465+ TimeUnit .MINUTES .toNanos (
466+ EnvUtils .getPropertyAsLong ("solr.admin.async.timeout.minutes" , 60L )),
467+ TimeUnit .MINUTES .toNanos (
468+ EnvUtils .getPropertyAsLong ("solr.admin.async.timeout.completed.minutes" , 5L )));
469+ }
470+
471+ /**
472+ * @param runningTimeoutNanos The time-to-keep for tasks in the RUNNING state.
473+ * @param completedTimeoutNanos The time-to-keep for tasks in the COMPLETED or FAILED state
474+ * after the status was polled.
475+ */
476+ CoreAdminAsyncTracker (Ticker ticker , long runningTimeoutNanos , long completedTimeoutNanos ) {
477+
478+ TaskExpiry expiry = new TaskExpiry (runningTimeoutNanos , completedTimeoutNanos );
479+ requestStatusCache =
480+ Caffeine .newBuilder ()
481+ .ticker (ticker )
482+ .maximumSize (MAX_TRACKED_REQUESTS )
483+ .expireAfter (expiry )
484+ .build ();
456485 }
457486
458487 public void shutdown () {
459488 ExecutorUtil .shutdownAndAwaitTermination (standardExecutor );
460489 ExecutorUtil .shutdownAndAwaitTermination (expensiveExecutor );
461490 }
462491
463- public Map <String , TaskObject > getRequestStatusMap (String key ) {
464- return requestStatusMap .get (key );
492+ public TaskObject getAsyncRequestForStatus (String key ) {
493+ TaskObject task = requestStatusCache .getIfPresent (key );
494+
495+ if (task != null && !RUNNING .equals (task .status ) && !task .polledAfterCompletion ) {
496+ task .polledAfterCompletion = true ;
497+ // At the first time we retrieve the status of a completed request, do a second lookup in
498+ // the cache. This is necessary to update the TTL of this request in the cache.
499+ // Unfortunately, we can't force the expiration time to be refreshed without a lookup.
500+ requestStatusCache .getIfPresent (key );
501+ }
502+
503+ return task ;
465504 }
466505
467506 public void submitAsyncTask (TaskObject taskObject ) throws SolrException {
468- ensureTaskIdNotInUse (taskObject .taskId );
469- addTask (RUNNING , taskObject );
507+ addTask (taskObject );
470508
471509 Runnable command =
472510 () -> {
@@ -497,42 +535,26 @@ public void submitAsyncTask(TaskObject taskObject) throws SolrException {
497535 }
498536 }
499537
500- /** Helper method to add a task to a tracking type. */
501- private void addTask (String type , TaskObject o , boolean limit ) {
502- synchronized (getRequestStatusMap (type )) {
503- if (limit && getRequestStatusMap (type ).size () == MAX_TRACKED_REQUESTS ) {
504- String key = getRequestStatusMap (type ).entrySet ().iterator ().next ().getKey ();
505- getRequestStatusMap (type ).remove (key );
506- }
507- addTask (type , o );
508- }
509- }
510-
511- private void addTask (String type , TaskObject o ) {
512- synchronized (getRequestStatusMap (type )) {
513- getRequestStatusMap (type ).put (o .taskId , o );
514- }
515- }
516-
517- /** Helper method to remove a task from a tracking map. */
518- private void removeTask (String map , String taskId ) {
519- synchronized (getRequestStatusMap (map )) {
520- getRequestStatusMap (map ).remove (taskId );
521- }
522- }
523-
524- private void ensureTaskIdNotInUse (String taskId ) throws SolrException {
525- if (getRequestStatusMap (RUNNING ).containsKey (taskId )
526- || getRequestStatusMap (COMPLETED ).containsKey (taskId )
527- || getRequestStatusMap (FAILED ).containsKey (taskId )) {
538+ private void addTask (TaskObject taskObject ) {
539+ // Ensure task ID is not already in use
540+ TaskObject taskInCache =
541+ requestStatusCache .get (
542+ taskObject .taskId ,
543+ n -> {
544+ taskObject .status = RUNNING ;
545+ return taskObject ;
546+ });
547+
548+ // If we get a different task instance, it means one was already in the cache with the
549+ // same name. Just reject the new one.
550+ if (taskInCache != taskObject ) {
528551 throw new SolrException (
529552 ErrorCode .BAD_REQUEST , "Duplicate request with the same requestid found." );
530553 }
531554 }
532555
533556 private void finishTask (TaskObject taskObject , boolean successful ) {
534- removeTask (RUNNING , taskObject .taskId );
535- addTask (successful ? COMPLETED : FAILED , taskObject , true );
557+ taskObject .status = successful ? COMPLETED : FAILED ;
536558 }
537559
538560 /**
@@ -546,6 +568,13 @@ public static class TaskObject {
546568 final Callable <SolrQueryResponse > task ;
547569 public String rspInfo ;
548570 public Object operationRspInfo ;
571+ private volatile String status ;
572+
573+ /**
574+ * Flag set to true once the task is complete (can be in error) and the status was polled
575+ * already once. Once set, the time we keep the task status is shortened.
576+ */
577+ private volatile boolean polledAfterCompletion ;
549578
550579 public TaskObject (
551580 String taskId , String action , boolean expensive , Callable <SolrQueryResponse > task ) {
@@ -574,6 +603,42 @@ public Object getOperationRspObject() {
574603 public void setOperationRspObject (SolrQueryResponse rspObject ) {
575604 this .operationRspInfo = rspObject .getResponse ();
576605 }
606+
607+ public String getStatus () {
608+ return status ;
609+ }
610+ }
611+
612+ /**
613+ * Expiration policy for Caffeine cache. Depending on whether the status of a completed task was
614+ * already retrieved, we return {@link #runningTimeoutNanos} or {@link #completedTimeoutNanos}.
615+ */
616+ private static class TaskExpiry implements Expiry <String , TaskObject > {
617+
618+ private final long runningTimeoutNanos ;
619+ private final long completedTimeoutNanos ;
620+
621+ private TaskExpiry (long runningTimeoutNanos , long completedTimeoutNanos ) {
622+ this .runningTimeoutNanos = runningTimeoutNanos ;
623+ this .completedTimeoutNanos = completedTimeoutNanos ;
624+ }
625+
626+ @ Override
627+ public long expireAfterCreate (String key , TaskObject task , long currentTime ) {
628+ return runningTimeoutNanos ;
629+ }
630+
631+ @ Override
632+ public long expireAfterUpdate (
633+ String key , TaskObject task , long currentTime , long currentDuration ) {
634+ return task .polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos ;
635+ }
636+
637+ @ Override
638+ public long expireAfterRead (
639+ String key , TaskObject task , long currentTime , long currentDuration ) {
640+ return task .polledAfterCompletion ? completedTimeoutNanos : runningTimeoutNanos ;
641+ }
577642 }
578643 }
579644}
0 commit comments