Skip to content

Commit ecb1fcd

Browse files
authored
[ML] Ensure that anomaly detection job state update retries if master node is temoporarily unavailable (#129391) (#129403)
During cluster upgrade, the anomaly detection jobs must be reassigned from one ML node to another. During this reassignment, the jobs transition through several states, including "opening" and "opened". If, during this transition, the master node becomes temporarily unavailable, e.g., due to reassignment, the new job state is not successfully committed to the cluster state. Therefore, once the new master became available, the cluster state was inconsistent: some anomaly detection jobs were opened, but their state got stuck as "opening". This PR introduces a retryable action for updating the job state to ensure that the job state is successfully updated and the cluster state remains consistent during the upgrade. Fixes #126148
1 parent d9dd6ae commit ecb1fcd

File tree

3 files changed

+205
-4
lines changed

3 files changed

+205
-4
lines changed

docs/changelog/129391.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
pr: 129391
2+
summary: Ensure that anomaly detection job state update retries if master node is
3+
temoporarily unavailable
4+
area: Machine Learning
5+
type: bug
6+
issues:
7+
- 126148

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManager.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.elasticsearch.ElasticsearchStatusException;
1212
import org.elasticsearch.ResourceNotFoundException;
1313
import org.elasticsearch.action.ActionListener;
14+
import org.elasticsearch.action.support.RetryableAction;
1415
import org.elasticsearch.client.internal.Client;
1516
import org.elasticsearch.cluster.ClusterChangedEvent;
1617
import org.elasticsearch.cluster.ClusterState;
@@ -29,6 +30,7 @@
2930
import org.elasticsearch.core.Tuple;
3031
import org.elasticsearch.index.analysis.AnalysisRegistry;
3132
import org.elasticsearch.indices.InvalidAliasNameException;
33+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
3234
import org.elasticsearch.rest.RestStatus;
3335
import org.elasticsearch.threadpool.ThreadPool;
3436
import org.elasticsearch.xcontent.NamedXContentRegistry;
@@ -1002,13 +1004,17 @@ public Optional<Duration> jobOpenTime(JobTask jobTask) {
10021004

10031005
void setJobState(JobTask jobTask, JobState state, String reason) {
10041006
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
1005-
jobTask.updatePersistentTaskState(
1007+
// retry state update to ensure that cluster state stays consistent
1008+
new UpdateStateRetryableAction(
1009+
logger,
1010+
threadPool,
1011+
jobTask,
10061012
jobTaskState,
10071013
ActionListener.wrap(
10081014
persistentTask -> logger.info("Successfully set job state to [{}] for job [{}]", state, jobTask.getJobId()),
10091015
e -> logSetJobStateFailure(state, jobTask.getJobId(), e)
10101016
)
1011-
);
1017+
).run();
10121018
}
10131019

10141020
private static void logSetJobStateFailure(JobState state, String jobId, Exception e) {
@@ -1021,7 +1027,8 @@ private static void logSetJobStateFailure(JobState state, String jobId, Exceptio
10211027

10221028
void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer<Exception, IOException> handler) {
10231029
JobTaskState jobTaskState = new JobTaskState(state, jobTask.getAllocationId(), reason, Instant.now());
1024-
jobTask.updatePersistentTaskState(jobTaskState, ActionListener.wrap(persistentTask -> {
1030+
// retry state update to ensure that cluster state stays consistent
1031+
new UpdateStateRetryableAction(logger, threadPool, jobTask, jobTaskState, ActionListener.wrap(persistentTask -> {
10251032
try {
10261033
handler.accept(null);
10271034
} catch (IOException e1) {
@@ -1033,7 +1040,7 @@ void setJobState(JobTask jobTask, JobState state, String reason, CheckedConsumer
10331040
} catch (IOException e1) {
10341041
logger.warn("Error while delegating exception [" + e.getMessage() + "]", e1);
10351042
}
1036-
}));
1043+
})).run();
10371044
}
10381045

10391046
public Optional<Tuple<DataCounts, Tuple<ModelSizeStats, TimingStats>>> getStatistics(JobTask jobTask) {
@@ -1082,4 +1089,50 @@ public ByteSizeValue getOpenProcessMemoryUsage() {
10821089
}
10831090
return ByteSizeValue.ofBytes(memoryUsedBytes);
10841091
}
1092+
1093+
private static class UpdateStateRetryableAction extends RetryableAction<PersistentTasksCustomMetadata.PersistentTask<?>> {
1094+
1095+
private static final int MIN_RETRY_SLEEP_MILLIS = 500;
1096+
private static final int RETRY_TIMEOUT_SECONDS = 30;
1097+
private final JobTask jobTask;
1098+
private final JobTaskState jobTaskState;
1099+
1100+
/**
1101+
* @param logger The logger (use AutodetectProcessManager.logger)
1102+
* @param threadPool The ThreadPool to schedule retries on
1103+
* @param jobTask The JobTask whose state we’re updating
1104+
* @param jobTaskState The new state to persist
1105+
*/
1106+
UpdateStateRetryableAction(
1107+
Logger logger,
1108+
ThreadPool threadPool,
1109+
JobTask jobTask,
1110+
JobTaskState jobTaskState,
1111+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> delegateListener
1112+
) {
1113+
super(
1114+
logger,
1115+
threadPool,
1116+
TimeValue.timeValueMillis(UpdateStateRetryableAction.MIN_RETRY_SLEEP_MILLIS),
1117+
TimeValue.timeValueSeconds(UpdateStateRetryableAction.RETRY_TIMEOUT_SECONDS),
1118+
delegateListener,
1119+
// executor for retries
1120+
threadPool.generic()
1121+
);
1122+
this.jobTask = Objects.requireNonNull(jobTask);
1123+
this.jobTaskState = Objects.requireNonNull(jobTaskState);
1124+
}
1125+
1126+
@Override
1127+
public void tryAction(ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener) {
1128+
// this will call back either onResponse(...) or onFailure(...)
1129+
jobTask.updatePersistentTaskState(jobTaskState, listener);
1130+
}
1131+
1132+
@Override
1133+
public boolean shouldRetry(Exception e) {
1134+
// retry everything *except* when the task truly no longer exists
1135+
return (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException) == false;
1136+
}
1137+
}
10851138
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/job/process/autodetect/AutodetectProcessManagerTests.java

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
package org.elasticsearch.xpack.ml.job.process.autodetect;
88

99
import org.elasticsearch.ElasticsearchException;
10+
import org.elasticsearch.ResourceNotFoundException;
1011
import org.elasticsearch.action.ActionListener;
1112
import org.elasticsearch.action.ActionType;
1213
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
@@ -33,6 +34,7 @@
3334
import org.elasticsearch.index.analysis.AnalysisRegistry;
3435
import org.elasticsearch.indices.TestIndexNameExpressionResolver;
3536
import org.elasticsearch.license.XPackLicenseState;
37+
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
3638
import org.elasticsearch.persistent.PersistentTasksService;
3739
import org.elasticsearch.tasks.TaskId;
3840
import org.elasticsearch.tasks.TaskManager;
@@ -91,6 +93,7 @@
9193
import java.util.Optional;
9294
import java.util.concurrent.Callable;
9395
import java.util.concurrent.CountDownLatch;
96+
import java.util.concurrent.Executor;
9497
import java.util.concurrent.ExecutorService;
9598
import java.util.concurrent.Future;
9699
import java.util.concurrent.TimeUnit;
@@ -253,6 +256,9 @@ public void setup() throws Exception {
253256
handler.accept(buildAutodetectParams());
254257
return null;
255258
}).when(jobResultsProvider).getAutodetectParams(any(), any(), any());
259+
260+
// when running retry logic use the real executor service
261+
when(threadPool.generic()).thenReturn(EsExecutors.DIRECT_EXECUTOR_SERVICE);
256262
}
257263

258264
public void testOpenJob() {
@@ -845,6 +851,141 @@ public void testGetOpenProcessMemoryUsage() {
845851
assertThat(manager.getOpenProcessMemoryUsage(), equalTo(ByteSizeValue.ofBytes(expectedSizeBytes)));
846852
}
847853

854+
public void testSetJobState_withoutHandler_invokesPersistentTaskUpdate() {
855+
AutodetectProcessManager manager = createSpyManager();
856+
JobTask jobTask = mock(JobTask.class);
857+
when(jobTask.getAllocationId()).thenReturn(123L);
858+
when(jobTask.getJobId()).thenReturn("job-123");
859+
860+
// call the no-handler overload
861+
manager.setJobState(jobTask, JobState.CLOSING, "closing-reason");
862+
863+
// verify we called updatePersistentTaskState with the expected state
864+
@SuppressWarnings("unchecked")
865+
ArgumentCaptor<JobTaskState> stateCaptor = ArgumentCaptor.forClass(JobTaskState.class);
866+
verify(jobTask).updatePersistentTaskState(stateCaptor.capture(), any());
867+
JobTaskState captured = stateCaptor.getValue();
868+
assertEquals(JobState.CLOSING, captured.getState());
869+
assertEquals(123L, captured.getAllocationId());
870+
assertEquals("closing-reason", captured.getReason());
871+
}
872+
873+
public void testSetJobState_withHandler_onResponse_triggersHandlerNull() throws IOException {
874+
// This test verifies the “happy‐path” of the retryable overload—i.e. what happens when the very first call
875+
// to updatePersistentTaskState succeeds. On a successful state update it must invoke handler.accept(null)
876+
// (because there was no error).
877+
AutodetectProcessManager manager = createSpyManager();
878+
JobTask jobTask = mock(JobTask.class);
879+
880+
// stub updatePersistentTaskState to call onResponse
881+
doAnswer(invocation -> {
882+
@SuppressWarnings("unchecked")
883+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
884+
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
885+
listener.onResponse(null);
886+
return null;
887+
}).when(jobTask).updatePersistentTaskState(any(), any());
888+
889+
AtomicReference<Exception> holder = new AtomicReference<>();
890+
CheckedConsumer<Exception, IOException> handler = holder::set;
891+
892+
manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler);
893+
894+
// onResponse should have driven handler.accept(null)
895+
assertNull(holder.get());
896+
verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any());
897+
}
898+
899+
public void testSetJobState_withHandler_onFailure_triggersHandlerException() throws IOException {
900+
// Verifies that when updatePersistentTaskState reports a failure, the handler receives that exception
901+
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class)))
902+
.thenAnswer(invocation -> {
903+
Runnable r = invocation.getArgument(0);
904+
r.run();
905+
return mock(ThreadPool.Cancellable.class);
906+
});
907+
AutodetectProcessManager manager = createSpyManager();
908+
JobTask jobTask = mock(JobTask.class);
909+
ResourceNotFoundException boom = new ResourceNotFoundException("boom");
910+
doAnswer(invocation -> {
911+
@SuppressWarnings("unchecked")
912+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener =
913+
(ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
914+
listener.onFailure(boom);
915+
return null;
916+
}).when(jobTask).updatePersistentTaskState(any(), any());
917+
918+
AtomicReference<Exception> holder = new AtomicReference<>();
919+
CheckedConsumer<Exception, IOException> handler = holder::set;
920+
921+
manager.setJobState(jobTask, JobState.FAILED, "fail-reason", handler);
922+
923+
// onFailure should have driven handler.accept(boom)
924+
assertSame(boom, holder.get());
925+
verify(jobTask).updatePersistentTaskState(any(JobTaskState.class), any());
926+
}
927+
928+
public void testSetJobState_withHandler_retriesUntilSuccess() throws IOException {
929+
// Verifies that transient failures are retried until eventual success, and the handler receives null on success
930+
931+
// ensure that all retries are executed on the same thread for determinism
932+
when(threadPool.schedule(any(Runnable.class), any(TimeValue.class), any(Executor.class))).thenAnswer(invocation -> {
933+
Runnable r = invocation.getArgument(0);
934+
r.run();
935+
return mock(ThreadPool.Cancellable.class);
936+
});
937+
AutodetectProcessManager manager = createSpyManager();
938+
JobTask jobTask = mock(JobTask.class);
939+
AtomicInteger attempts = new AtomicInteger();
940+
doAnswer(invocation -> {
941+
// Simulate transient failures for the first two attempts, then succeed on the third
942+
@SuppressWarnings("unchecked")
943+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
944+
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
945+
if (attempts.incrementAndGet() < 3) {
946+
listener.onFailure(new RuntimeException("transient failure"));
947+
} else {
948+
listener.onResponse(null);
949+
}
950+
return null;
951+
}).when(jobTask).updatePersistentTaskState(any(), any());
952+
953+
AtomicReference<Exception> holder = new AtomicReference<>();
954+
CheckedConsumer<Exception, IOException> handler = holder::set;
955+
956+
manager.setJobState(jobTask, JobState.OPENED, "retry-test", handler);
957+
958+
// confirms that the method was called exactly three times (two failures then one success).
959+
verify(jobTask, times(3)).updatePersistentTaskState(any(JobTaskState.class), any());
960+
assertNull(holder.get());
961+
}
962+
963+
public void testSetJobState_withHandler_noRetryOnResourceNotFound() throws IOException {
964+
// Ensures that if the persistent‐state update fails with a ResourceNotFoundException, the retry loop does not retry
965+
// again but immediately invokes the user’s handler with that exception.
966+
AutodetectProcessManager manager = createSpyManager();
967+
JobTask jobTask = mock(JobTask.class);
968+
ResourceNotFoundException rnfe = new ResourceNotFoundException("not found");
969+
doAnswer(invocation -> {
970+
// Simulate a ResourceNotFoundException that should not be retried
971+
@SuppressWarnings("unchecked")
972+
ActionListener<PersistentTasksCustomMetadata.PersistentTask<?>> listener = (ActionListener<
973+
PersistentTasksCustomMetadata.PersistentTask<?>>) invocation.getArguments()[1];
974+
listener.onFailure(rnfe);
975+
return null;
976+
}).when(jobTask).updatePersistentTaskState(any(), any());
977+
978+
AtomicReference<Exception> holder = new AtomicReference<>();
979+
CheckedConsumer<Exception, IOException> handler = holder::set;
980+
981+
manager.setJobState(jobTask, JobState.OPENED, "rnfe-test", handler);
982+
983+
// updatePersistentTaskState(...) was invoked exactly once (no retries).
984+
verify(jobTask, times(1)).updatePersistentTaskState(any(JobTaskState.class), any());
985+
// The handler should have been invoked with the ResourceNotFoundException
986+
assertSame(rnfe, holder.get());
987+
}
988+
848989
private AutodetectProcessManager createNonSpyManager(String jobId) {
849990
ExecutorService executorService = mock(ExecutorService.class);
850991
when(threadPool.executor(anyString())).thenReturn(executorService);

0 commit comments

Comments
 (0)