@@ -130,6 +130,10 @@ private DeciderOutcome decide(final WorkflowDef def, final Workflow workflow, Li
130130 TaskDef taskDef = metadata .getTaskDef (task .getTaskDefName ());
131131 if (taskDef != null ) {
132132 checkForTimeout (taskDef , task );
133+ // If the task has not been updated for "responseTimeout" then rescheduled it.
134+ if (checkForResponseTimeout (taskDef , task )){
135+ outcome .tasksToBeRequeued .add (task );
136+ }
133137 }
134138
135139 if (!task .getStatus ().isSuccessful ()) {
@@ -374,6 +378,30 @@ void checkForTimeout(TaskDef taskType, Task task) {
374378 return ;
375379 }
376380
381+ @ VisibleForTesting
382+ boolean checkForResponseTimeout (TaskDef taskType , Task task ) {
383+
384+ if (taskType == null ){
385+ logger .warn ("missing task type " + task .getTaskDefName () + ", workflowId=" + task .getWorkflowInstanceId ());
386+ return false ;
387+ }
388+ if (task .getStatus ().isTerminal () || taskType .getTimeoutSeconds () <= 0 ||
389+ !task .getStatus ().equals (Status .IN_PROGRESS ) || taskType .getResponseTimeoutSeconds () == 0 ) {
390+ return false ;
391+ }
392+
393+ long responseTimeout = 1000 * taskType .getResponseTimeoutSeconds ();
394+ long now = System .currentTimeMillis ();
395+ long noResponseTime = now - task .getUpdateTime ();
396+
397+ if (noResponseTime < responseTimeout ) {
398+ return false ;
399+ }
400+ Monitors .recordTaskResponseTimeout (task .getTaskDefName ());
401+
402+ return true ;
403+ }
404+
377405 private List <Task > getTasksToBeScheduled (WorkflowDef def , Workflow workflow , WorkflowTask taskToSchedule , int retryCount ) {
378406 return getTasksToBeScheduled (def , workflow , taskToSchedule , retryCount , null );
379407 }
@@ -672,6 +700,8 @@ public static class DeciderOutcome {
672700 List <Task > tasksToBeScheduled = new LinkedList <>();
673701
674702 List <Task > tasksToBeUpdated = new LinkedList <>();
703+
704+ List <Task > tasksToBeRequeued = new LinkedList <>();
675705
676706 boolean isComplete ;
677707
0 commit comments