Skip to content

Manage AWS Batch Unscheduled jobs #5936

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

5 changes: 5 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,11 @@ The following settings are available:
:::
: The share identifier for all tasks when using [fair-share scheduling for AWS Batch](https://aws.amazon.com/blogs/hpc/introducing-fair-share-scheduling-for-aws-batch/)

`aws.batch.terminateUnschedulableJobs`
: :::{versionadded} 25.03.0-edge
:::
: When `true`, jobs that cannot be scheduled for lack of resources or misconfiguration are terminated automatically (default: `false`). The pipeline may complete with an error status depending on the error strategy defined for the corresponding jobs.

`aws.batch.volumes`
: One or more container mounts. Mounts can be specified as simple e.g. `/some/path` or canonical format e.g. `/host/path:/mount/path[:ro|rw]`. Multiple mounts can be specified separating them with a comma or using a list object.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ Max number of downloads attempts from S3 (default: `1`).
""")
public String shareIdentifier;

@ConfigOption
@Description("""
When true, jobs that cannot be scheduled for lack of resources or misconfiguration are terminated automatically (default: `false`).
""")
public boolean terminateUnschedulableJobs;

@ConfigOption
@Description("""
One or more container mounts. Mounts can be specified as simple e.g. `/some/path` or canonical format e.g. `/host/path:/mount/path[:ro|rw]`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job

final static private Map<String,String> jobDefinitions = [:]

final static private List<String> MISCONFIGURATION_REASONS = List.of(
"MISCONFIGURATION:JOB_RESOURCE_REQUIREMENT",
"MISCONFIGURATION:COMPUTE_ENVIRONMENT_MAX_RESOURCE"
)

/**
* Batch context shared between multiple task handlers
*/
Expand Down Expand Up @@ -232,12 +237,40 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
final result = job?.status in ['RUNNING', 'SUCCEEDED', 'FAILED']
if( result )
this.status = TaskStatus.RUNNING
else
checkIfUnschedulable(job)
// fetch the task arn
if( !taskArn )
taskArn = job?.getContainer()?.getTaskArn()
return result
}

protected void checkIfUnschedulable(JobDetail job) {
if( job ) try {
checkIfUnschedulable0(job)
}
catch (Throwable e) {
log.warn "Unable to check if job is unschedulable - ${e.message}", e
}
}

private void checkIfUnschedulable0(JobDetail job) {
final reason = errReason(job)
if( MISCONFIGURATION_REASONS.any((it) -> reason.contains(it)) ) {
final msg = "unschedulable AWS Batch job ${jobId} (${task.lazyName()}) - $reason"
// If indicated in aws.batch config kill the job an produce a failure
if( executor.awsOptions.terminateUnschedulableJobs() ){
log.warn("Terminating ${jobId}")
kill()
task.error = new ProcessException("Unschedulable AWS Batch job ${jobId} - $reason")
status = TaskStatus.COMPLETED
}
else {
log.warn "Detected $msg"
}
}
}

protected String errReason(JobDetail job){
if(!job)
return "(unknown)"
Expand All @@ -256,6 +289,10 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
@Override
boolean checkIfCompleted() {
assert jobId
if( isCompleted() ) {
//Task can be marked as completed before running by unschedulable reason. Return true
return true
}
if( !isRunning() )
return false
final job = describeJob(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,8 @@ class AwsOptions implements CloudTransferOptions {
return awsConfig.batchConfig.getExecutionRole()
}

boolean terminateUnschedulableJobs() {
return awsConfig.batchConfig.terminateUnschedulableJobs
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ class AwsBatchConfig implements CloudTransferOptions {
*/
boolean fargateMode

/**
* Flag to fail and terminate unscheduled jobs.
*/
boolean terminateUnschedulableJobs

/*
* only for testing
*/
Expand All @@ -112,11 +117,11 @@ class AwsBatchConfig implements CloudTransferOptions {
shareIdentifier = opts.shareIdentifier
schedulingPriority = opts.schedulingPriority as Integer ?: 0
executionRole = opts.executionRole
terminateUnschedulableJobs = opts.terminateUnschedulableJobs as boolean
if( retryMode == 'built-in' )
retryMode = null // this force falling back on NF built-in retry mode instead of delegating to AWS CLI tool
if( retryMode && retryMode !in AwsOptions.VALID_RETRY_MODES )
log.warn "Unexpected value for 'aws.batch.retryMode' config setting - offending value: $retryMode - valid values: ${AwsOptions.VALID_RETRY_MODES.join(',')}"

}

// ==== getters =====
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class AwsBatchConfigTest extends Specification {
!batch.isFargateMode()
!batch.s5cmdPath
batch.schedulingPriority == 0
!batch.terminateUnschedulableJobs
}

def 'should create config with options' () {
Expand Down Expand Up @@ -139,4 +140,17 @@ class AwsBatchConfigTest extends Specification {
[platformType: 'fargate', cliPath: "/opt/s5cmd --foo"] | null | '/opt/s5cmd --foo'| true
}

def 'should parse unschedulable flag' () {
given:
def opts = new AwsBatchConfig(OPTS)

expect:
opts.terminateUnschedulableJobs == UNSCHEDULABLE

where:
OPTS | UNSCHEDULABLE
[:] | false
[terminateUnschedulableJobs: false] | false
[terminateUnschedulableJobs: true] | true
}
}