Skip to content

Commit

Permalink
Support the timeout retry policy (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
jun-he committed Aug 9, 2024
1 parent b385356 commit b8c79f7
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ public enum StepInstanceAction {
STEP_INSTANCE_STATUS_TO_ACTION_MAP.put(
StepInstance.Status.PLATFORM_FAILED,
Arrays.asList(StepInstanceAction.STOP, StepInstanceAction.KILL, StepInstanceAction.SKIP));
STEP_INSTANCE_STATUS_TO_ACTION_MAP.put(
StepInstance.Status.TIMEOUT_FAILED,
Arrays.asList(StepInstanceAction.STOP, StepInstanceAction.KILL, StepInstanceAction.SKIP));

STEP_INSTANCE_STATUS_TO_ACTION_MAP.put(
StepInstance.Status.FATALLY_FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ private Constants() {}
/** maximum retry wait limit for platform errors. */
public static final int MAX_PLATFORM_RETRY_LIMIT_SECS = 24 * 3600; // 1 day

/** maximum retry wait limit for timeout errors. */
public static final int MAX_TIMEOUT_RETRY_LIMIT_SECS = 24 * 3600; // 1 days

/** Max timeout limit in milliseconds. */
public static final long MAX_TIME_OUT_LIMIT_IN_MILLIS = TimeUnit.DAYS.toMillis(120); // 120 days

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ private Defaults() {}
/** Defaults for fixed retry delay for user errors. */
private static final long DEFAULT_FIXED_USER_RETRY_BACKOFF_SECS = 60L;

/** Defaults for fixed retry delay for timeout errors. */
private static final long DEFAULT_FIXED_TIMEOUT_RETRY_BACKOFF_SECS = 60L;

/** Defaults for exponential retry exponent for user errors. */
private static final int DEFAULT_ERROR_RETRY_EXPONENT = 2;

Expand All @@ -57,32 +60,47 @@ private Defaults() {}
/** Defaults for exponential max retry limit for platform errors. */
private static final long DEFAULT_PLATFORM_RETRY_LIMIT_SECS = 3600L;

/** Defaults for exponential retry exponent for timeout errors. */
private static final int DEFAULT_TIMEOUT_RETRY_EXPONENT = 2;

/** Defaults for exponential retry base backoff for timeout errors. */
private static final long DEFAULT_BASE_TIMEOUT_RETRY_BACKOFF_SECS = 60L;

/** Defaults for exponential max retry limit for timeout errors. */
private static final long DEFAULT_TIMEOUT_RETRY_LIMIT_SECS = 3600L;

/** Default Exponential backoff. */
public static final RetryPolicy.ExponentialBackoff DEFAULT_EXPONENTIAL_BACK_OFF =
RetryPolicy.ExponentialBackoff.builder()
.errorRetryExponent(DEFAULT_ERROR_RETRY_EXPONENT)
.errorRetryBackoffInSecs(DEFAULT_BASE_ERROR_RETRY_BACKOFF_SECS)
.errorRetryLimitInSecs(DEFAULT_ERROR_RETRY_LIMIT_SECS)
.platformRetryBackoffInSecs(DEFAULT_BASE_PLATFORM_RETRY_BACKOFF_SECS)
.platformRetryExponent(DEFAULT_PLATFORM_RETRY_EXPONENT)
.platformRetryBackoffInSecs(DEFAULT_BASE_PLATFORM_RETRY_BACKOFF_SECS)
.platformRetryLimitInSecs(DEFAULT_PLATFORM_RETRY_LIMIT_SECS)
.timeoutRetryExponent(DEFAULT_TIMEOUT_RETRY_EXPONENT)
.timeoutRetryBackoffInSecs(DEFAULT_BASE_TIMEOUT_RETRY_BACKOFF_SECS)
.timeoutRetryLimitInSecs(DEFAULT_TIMEOUT_RETRY_LIMIT_SECS)
.build();

/** Default Fixed backoff. */
public static final RetryPolicy.FixedBackoff DEFAULT_FIXED_BACK_OFF =
RetryPolicy.FixedBackoff.builder()
.platformRetryBackoffInSecs(DEFAULT_FIXED_PLATFORM_RETRY_BACKOFF_SECS)
.errorRetryBackoffInSecs(DEFAULT_FIXED_USER_RETRY_BACKOFF_SECS)
.platformRetryBackoffInSecs(DEFAULT_FIXED_PLATFORM_RETRY_BACKOFF_SECS)
.timeoutRetryBackoffInSecs(DEFAULT_FIXED_TIMEOUT_RETRY_BACKOFF_SECS)
.build();

private static final long DEFAULT_USER_RETRY_LIMIT = 2L;
private static final long DEFAULT_PLATFORM_RETRY_LIMIT = 10L;
private static final long DEFAULT_TIMEOUT_RETRY_LIMIT = 0L;

/** Default retry policy if unset. */
public static final RetryPolicy DEFAULT_RETRY_POLICY =
RetryPolicy.builder()
.errorRetryLimit(DEFAULT_USER_RETRY_LIMIT)
.platformRetryLimit(DEFAULT_PLATFORM_RETRY_LIMIT)
.timeoutRetryLimit(DEFAULT_TIMEOUT_RETRY_LIMIT)
.backoff(DEFAULT_EXPONENTIAL_BACK_OFF)
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder(
value = {"error_retry_limit", "platform_retry_limit", "backoff"},
value = {"error_retry_limit", "platform_retry_limit", "timeout_retry_limit", "backoff"},
alphabetic = true)
@JsonDeserialize(builder = RetryPolicy.RetryPolicyBuilder.class)
@Getter
Expand All @@ -46,6 +46,9 @@ public class RetryPolicy {
@Max(Constants.MAX_RETRY_LIMIT)
private final Long platformRetryLimit;

@Max(Constants.MAX_RETRY_LIMIT)
private final Long timeoutRetryLimit;

/** Backoff strategy. */
private final Backoff backoff;

Expand All @@ -67,7 +70,7 @@ public static BackoffPolicyType create(String type) {
* Merge a given step retry policy with DEFAULT RETRY POLICY.
*
* @param policy retry policy
* @return retry policy
* @return final retry policy
*/
public static RetryPolicy tryMergeWithDefault(RetryPolicy policy) {
RetryPolicy defaultRetryPolicy = Defaults.DEFAULT_RETRY_POLICY;
Expand All @@ -82,6 +85,10 @@ public static RetryPolicy tryMergeWithDefault(RetryPolicy policy) {
if (retryPolicyBuilder.platformRetryLimit == null) {
retryPolicyBuilder.platformRetryLimit = defaultRetryPolicy.platformRetryLimit;
}
// Merge from default.
if (retryPolicyBuilder.timeoutRetryLimit == null) {
retryPolicyBuilder.timeoutRetryLimit = defaultRetryPolicy.timeoutRetryLimit;
}
if (retryPolicyBuilder.backoff == null) {
retryPolicyBuilder.backoff = defaultRetryPolicy.backoff;
} else {
Expand Down Expand Up @@ -112,6 +119,9 @@ public interface Backoff {
/** Get next retry delay for platform errors. */
int getNextRetryDelayForPlatformError(long platformRetries);

/** Get next retry delay for timeout errors. */
int getNextRetryDelayForTimeoutError(long timeoutRetries);

/** Merge with default and get new backoff. */
Backoff mergeWithDefault();
}
Expand All @@ -127,7 +137,10 @@ public interface Backoff {
"error_retry_limit_in_secs",
"platform_retry_backoff_in_secs",
"platform_retry_exponent",
"platform_retry_limit_in_secs"
"platform_retry_limit_in_secs",
"timeout_retry_backoff_in_secs",
"timeout_retry_exponent",
"timeout_retry_limit_in_secs"
},
alphabetic = true)
@JsonDeserialize(builder = ExponentialBackoff.ExponentialBackoffBuilder.class)
Expand Down Expand Up @@ -156,6 +169,17 @@ public static class ExponentialBackoff implements Backoff {
@Max(Constants.MAX_PLATFORM_RETRY_LIMIT_SECS)
private final Long platformRetryLimitInSecs;

/** Base time in seconds to wait between retries for timeout errors. */
@Max(Constants.MAX_TIMEOUT_RETRY_LIMIT_SECS)
private final Long timeoutRetryBackoffInSecs;

/** Base exponent for timeout errors. */
private final Integer timeoutRetryExponent;

/** Max time in seconds to wait between retries for timeout errors. */
@Max(Constants.MAX_TIMEOUT_RETRY_LIMIT_SECS)
private final Long timeoutRetryLimitInSecs;

@Override
public BackoffPolicyType getType() {
return BackoffPolicyType.EXPONENTIAL_BACKOFF;
Expand All @@ -174,6 +198,13 @@ public int getNextRetryDelayForPlatformError(long platformRetries) {
return (int) Math.min(waitVal, platformRetryLimitInSecs);
}

@Override
public int getNextRetryDelayForTimeoutError(long timeoutRetries) {
long waitVal =
(long) (timeoutRetryBackoffInSecs * Math.pow(timeoutRetryExponent, timeoutRetries));
return (int) Math.min(waitVal, timeoutRetryLimitInSecs);
}

@Override
public Backoff mergeWithDefault() {
RetryPolicy.ExponentialBackoff defaultExponentialBackoff =
Expand Down Expand Up @@ -203,6 +234,18 @@ public Backoff mergeWithDefault() {
exponentialBackoffBuilder.platformRetryExponent =
defaultExponentialBackoff.platformRetryExponent;
}
if (exponentialBackoffBuilder.timeoutRetryBackoffInSecs == null) {
exponentialBackoffBuilder.timeoutRetryBackoffInSecs =
defaultExponentialBackoff.timeoutRetryBackoffInSecs;
}
if (exponentialBackoffBuilder.timeoutRetryLimitInSecs == null) {
exponentialBackoffBuilder.timeoutRetryLimitInSecs =
defaultExponentialBackoff.timeoutRetryLimitInSecs;
}
if (exponentialBackoffBuilder.timeoutRetryExponent == null) {
exponentialBackoffBuilder.timeoutRetryExponent =
defaultExponentialBackoff.timeoutRetryExponent;
}
return exponentialBackoffBuilder.build();
}

Expand All @@ -217,7 +260,11 @@ public static final class ExponentialBackoffBuilder {}
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy.class)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder(
value = {"error_retry_backoff_in_secs", "platform_retry_backoff_in_secs"},
value = {
"error_retry_backoff_in_secs",
"platform_retry_backoff_in_secs",
"timeout_retry_backoff_in_secs"
},
alphabetic = true)
@JsonDeserialize(builder = FixedBackoff.FixedBackoffBuilder.class)
@Getter
Expand All @@ -229,6 +276,9 @@ public static class FixedBackoff implements Backoff {
/** Constant wait between platform retries. */
private final Long platformRetryBackoffInSecs;

/** Constant wait between timeout retries. */
private final Long timeoutRetryBackoffInSecs;

@Override
public BackoffPolicyType getType() {
return BackoffPolicyType.FIXED_BACKOFF;
Expand All @@ -244,6 +294,11 @@ public int getNextRetryDelayForPlatformError(long platformRetries) {
return platformRetryBackoffInSecs.intValue();
}

@Override
public int getNextRetryDelayForTimeoutError(long timeoutRetries) {
return timeoutRetryBackoffInSecs.intValue();
}

@Override
public Backoff mergeWithDefault() {
RetryPolicy.FixedBackoff defaultFixedBackoff = Defaults.DEFAULT_FIXED_BACK_OFF;
Expand All @@ -255,6 +310,10 @@ public Backoff mergeWithDefault() {
fixedBackoffBuilder.platformRetryBackoffInSecs =
defaultFixedBackoff.platformRetryBackoffInSecs;
}
if (fixedBackoffBuilder.timeoutRetryBackoffInSecs == null) {
fixedBackoffBuilder.timeoutRetryBackoffInSecs =
defaultFixedBackoff.timeoutRetryBackoffInSecs;
}
return fixedBackoffBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ public StepDependencies getSignalDependencies() {
"error_retry_limit",
"platform_retries",
"platform_retry_limit",
"timeout_retries",
"timeout_retry_limit",
"manual_retries",
"retryable",
"backoff"
Expand All @@ -162,6 +164,8 @@ public static class StepRetry {
private long errorRetryLimit;
private long platformRetries; // retry count due to platform failure
private long platformRetryLimit;
private long timeoutRetries; // retry count due to timeout failure
private long timeoutRetryLimit;
private long manualRetries; // retry count due to manual api call to restart
private boolean retryable = true; // mark if the step is retryable by the system
private RetryPolicy.Backoff backoff;
Expand All @@ -176,12 +180,19 @@ public boolean hasReachedPlatformRetryLimit() {
return platformRetries >= platformRetryLimit || !retryable;
}

/** check if reaching timeout retry limit. */
public boolean hasReachedTimeoutRetryLimit() {
return timeoutRetries >= timeoutRetryLimit || !retryable;
}

/** increment corresponding retry count based on status. */
public void incrementByStatus(Status status) {
if (status == Status.USER_FAILED) {
errorRetries++;
} else if (status == Status.PLATFORM_FAILED) {
platformRetries++;
} else if (status == Status.TIMEOUT_FAILED) {
timeoutRetries++;
} else if (status.isRestartable()) {
manualRetries++;
} else {
Expand All @@ -193,14 +204,16 @@ public void incrementByStatus(Status status) {
/**
* Get next retry delay based on error and configured retry policy.
*
* @param status status
* @return delay for the next attempt
* @param status the step instance status
* @return the next retry delay in secs.
*/
public int getNextRetryDelay(Status status) {
if (status == Status.USER_FAILED) {
return backoff.getNextRetryDelayForUserError(errorRetries);
} else if (status == Status.PLATFORM_FAILED) {
return backoff.getNextRetryDelayForPlatformError(platformRetries);
} else if (status == Status.TIMEOUT_FAILED) {
return backoff.getNextRetryDelayForTimeoutError(timeoutRetries);
} else {
// Not expected to get retry delay for any other errors.
throw new MaestroInvalidStatusException(
Expand All @@ -214,6 +227,7 @@ public static StepRetry from(@Nullable RetryPolicy policy) {
StepRetry stepRetry = new StepRetry();
stepRetry.errorRetryLimit = retryPolicy.getErrorRetryLimit();
stepRetry.platformRetryLimit = retryPolicy.getPlatformRetryLimit();
stepRetry.timeoutRetryLimit = retryPolicy.getTimeoutRetryLimit();
stepRetry.retryable = true;
stepRetry.backoff = retryPolicy.getBackoff();
return stepRetry;
Expand Down Expand Up @@ -281,7 +295,9 @@ public enum Status {

/** Step is stopped by a user or the workflow, terminal state. */
STOPPED(true, false, false, false),
/** Step is timed out by the system, terminal state. */
/** Step is failed due to execution timeout error, terminal state. */
TIMEOUT_FAILED(true, false, true, true),
/** Step is fatally timed out by the system, terminal state. */
TIMED_OUT(true, false, false, false);

@JsonIgnore private final boolean terminal; // if it is terminal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,11 +542,12 @@ private boolean isStepSatisfied(
return satisfied;
}

/** Check if timed out, which is based on step start time. */
/** Check if the execution is timed out, which is based on step start time. */
private boolean isTimeout(StepRuntimeSummary runtimeSummary) {
if (runtimeSummary.getRuntimeState() != null
&& runtimeSummary.getRuntimeState().getStartTime() != null) {
if (runtimeSummary.getRuntimeState().getStatus() == StepInstance.Status.TIMED_OUT) {
if (runtimeSummary.getRuntimeState().getStatus() == StepInstance.Status.TIMED_OUT
|| runtimeSummary.getRuntimeState().getStatus() == StepInstance.Status.TIMEOUT_FAILED) {
return true;
}
long timeoutInMillis =
Expand Down Expand Up @@ -650,12 +651,7 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor)
runtimeSummary.getRuntimeState().getStatus().name());

if (isTimeout(runtimeSummary)) {
LOG.info(
"Workflow instance {}'s step {} is timed out.",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity());
terminate(workflowSummary, runtimeSummary, StepInstance.Status.TIMED_OUT);
runtimeSummary.addTimeline(TimelineLogEvent.info("Step instance is timed out."));
handleTimeoutError(workflowSummary, runtimeSummary);
} else {
tryUpdateByAction(workflowSummary, stepDefinition, runtimeSummary);
}
Expand Down Expand Up @@ -685,6 +681,20 @@ public boolean execute(Workflow workflow, Task task, WorkflowExecutor executor)
}
}

private void handleTimeoutError(
WorkflowSummary workflowSummary, StepRuntimeSummary runtimeSummary) {
LOG.info(
"Workflow instance {}'s step {} is timed out.",
workflowSummary.getIdentity(),
runtimeSummary.getIdentity());
if (runtimeSummary.getStepRetry().hasReachedTimeoutRetryLimit()) {
terminate(workflowSummary, runtimeSummary, StepInstance.Status.TIMED_OUT);
runtimeSummary.addTimeline(TimelineLogEvent.info("Step instance is timed out."));
} else {
runtimeSummary.markTerminated(StepInstance.Status.TIMEOUT_FAILED, tracingManager);
}
}

// Figure out if an exception is retryable
private boolean isRetryableError(Exception e) {
if (e.getCause() instanceof SQLException) {
Expand Down Expand Up @@ -837,6 +847,7 @@ private boolean doExecute(
case INTERNALLY_FAILED: // Ignoring failure model as the error happens within Maestro
case USER_FAILED:
case PLATFORM_FAILED:
case TIMEOUT_FAILED:
case STOPPED:
case TIMED_OUT:
doneWithExecute = true;
Expand Down Expand Up @@ -904,7 +915,8 @@ private void terminateAllSteps(Workflow workflow, WorkflowSummary summary, Strin
void updateRetryDelayTimeToTimeline(StepRuntimeSummary runtimeSummary) {
StepInstance.Status status = runtimeSummary.getRuntimeState().getStatus();
if (status == StepInstance.Status.USER_FAILED
|| status == StepInstance.Status.PLATFORM_FAILED) {
|| status == StepInstance.Status.PLATFORM_FAILED
|| status == StepInstance.Status.TIMEOUT_FAILED) {
int nextRetryDelayInSecs =
runtimeSummary
.getStepRetry()
Expand Down Expand Up @@ -1069,6 +1081,7 @@ private void deriveTaskStatus(Task task, StepRuntimeSummary runtimeSummary) {
break;
case USER_FAILED:
case PLATFORM_FAILED:
case TIMEOUT_FAILED:
task.setStatus(Task.Status.FAILED);
task.setStartDelayInSeconds(
runtimeSummary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public final class AggregatedViewHelper {
StepInstance.Status.USER_FAILED, WorkflowInstance.Status.FAILED);
STEP_INSTANCE_STATUS_TO_WORKFLOW_INSTANCE_STATUS.put(
StepInstance.Status.PLATFORM_FAILED, WorkflowInstance.Status.FAILED);
STEP_INSTANCE_STATUS_TO_WORKFLOW_INSTANCE_STATUS.put(
StepInstance.Status.TIMEOUT_FAILED, WorkflowInstance.Status.FAILED);

// STOPPED statuses
STEP_INSTANCE_STATUS_TO_WORKFLOW_INSTANCE_STATUS.put(
Expand Down
Loading

0 comments on commit b8c79f7

Please sign in to comment.