Skip to content

Commit

Permalink
Updating Job Watcher service (#962)
Browse files Browse the repository at this point in the history
Changes done:
Updating JobWatching service to check the job state as Preempted or errored.
This is one of the edge case which is to check whether the current job is errored or preempted, and stop the worker job all together.

Earlier, the JobWatching service was exiting the system in case job is cancelled.
  • Loading branch information
abhgarg-fb authored Feb 4, 2021
1 parent 893c3ec commit 6080562
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ public class EventCode {
public static final EventCode WORKER_JOB_FINISHED = new EventCode("WORKER_JOB_FINISHED");
public static final EventCode WORKER_JOB_STARTED = new EventCode("WORKER_JOB_STARTED");

public static final EventCode WATCHING_SERVICE_JOB_ERRORED = new EventCode("WATCHING_SERVICE_JOB_ERRORED");
public static final EventCode WATCHING_SERVICE_JOB_PREEMPTED = new EventCode("WATCHING_SERVICE_JOB_PREEMPTED");

public static final EventCode COPIER_FINISHED_EXPORT = new EventCode("COPIER_FINISHED_EXPORT");
public static final EventCode COPIER_FINISHED_IMPORT = new EventCode("COPIER_FINISHED_IMPORT");
public static final EventCode COPIER_STARTED_EXPORT = new EventCode("COPIER_STARTED_EXPORT");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,53 @@ protected void runOneIteration() {
}
monitor.debug(() -> "polling for job to check cancellation");
PortabilityJob currentJob = store.findJob(JobMetadata.getJobId());
boolean isCanceled = currentJob.state() == PortabilityJob.State.CANCELED;
if (isCanceled) {
monitor.info(
() -> String.format("Job %s is canceled", JobMetadata.getJobId()),
EventCode.WORKER_JOB_CANCELED);
dtpInternalMetricRecorder.cancelledJob(
JobMetadata.getDataType(),
JobMetadata.getExportService(),
JobMetadata.getImportService(),
JobMetadata.getStopWatch().elapsed());
monitor.flushLogs();
System.exit(0);
} else {
monitor.debug(() -> String.format("Job %s is not canceled", JobMetadata.getJobId()));
switch (currentJob.state()) {
case CANCELED:
monitor.info(
() -> String.format("Job %s is canceled", JobMetadata.getJobId()),
EventCode.WORKER_JOB_CANCELED);
dtpInternalMetricRecorder.cancelledJob(
JobMetadata.getDataType(),
JobMetadata.getExportService(),
JobMetadata.getImportService(),
JobMetadata.getStopWatch().elapsed());
monitor.flushLogs();
System.exit(0);
break;
case ERROR:
monitor.severe(
() -> String.format("Job %s is errored", JobMetadata.getJobId()),
EventCode.WATCHING_SERVICE_JOB_ERRORED);
recordGeneralMetric(PortabilityJob.State.ERROR.toString());
monitor.flushLogs();
System.exit(0);
break;
case PREEMPTED:
monitor.info(
() -> String.format("Job %s is preempted", JobMetadata.getJobId()),
EventCode.WATCHING_SERVICE_JOB_PREEMPTED);
recordGeneralMetric(PortabilityJob.State.PREEMPTED.toString());
monitor.flushLogs();
System.exit(0);
break;
default:
monitor.debug(
() ->
String.format(
"Job %s is not canceled or errored or preempted", JobMetadata.getJobId()));
}
}

@Override
protected Scheduler scheduler() {
return scheduler;
}

private void recordGeneralMetric(String jobState) {
dtpInternalMetricRecorder.recordGenericMetric(
JobMetadata.getDataType(),
JobMetadata.getExportService(),
jobState,
JobMetadata.getStopWatch().elapsed());
}
}

0 comments on commit 6080562

Please sign in to comment.