@@ -67,8 +67,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
6767 . Where ( a => a . Value > 0 )
6868 . ToDictionary ( a => a . Key , a => a . Value ) ;
6969
70- var scheduledWorkers = _registeredWorkers . Where ( a => queuedTasks . ContainsKey ( GetQueueTaskName ( a ) ) )
71- . ToList ( ) ;
70+ var scheduledWorkers = _registeredWorkers . Where ( a => queuedTasks . ContainsKey ( GetQueueTaskName ( a ) ) ) . ToList ( ) ;
7271
7372 currentSleepInterval = _pollTimingStrategy . CalculateDelay (
7473 queuedTasks ,
@@ -77,8 +76,7 @@ public async Task StartAsync(CancellationToken cancellationToken)
7776 currentSleepInterval
7877 ) ;
7978
80- scheduledWorkers =
81- _pollOrderStrategy . CalculateOrder ( queuedTasks , scheduledWorkers , _semaphore . CurrentCount ) ;
79+ scheduledWorkers = _pollOrderStrategy . CalculateOrder ( queuedTasks , scheduledWorkers , _semaphore . CurrentCount ) ;
8280
8381 foreach ( var scheduledWorker in scheduledWorkers )
8482 {
@@ -171,14 +169,13 @@ CancellationToken cancellationToken
171169 // TODO: Check what the operation and payload type are
172170 var externalStorageLocation = await _taskManager . GetExternalStorageLocationAsync (
173171 pollResponse . ExternalInputPayloadStoragePath ,
174- "" ,
175- "" ,
172+ "READ " ,
173+ "TASK_INPUT " ,
176174 cancellationToken
177175 ) ;
178176
179177 // TODO: iffy
180- var file = await _externalPayloadService . GetExternalStorageDataAsync ( externalStorageLocation . Path ,
181- tokenHolder . CancellationToken ) ;
178+ var file = await _externalPayloadService . GetExternalStorageDataAsync ( externalStorageLocation . Path , tokenHolder . CancellationToken ) ;
182179
183180 using TextReader textReader = new StreamReader ( file . Stream ) ;
184181 var json = await textReader . ReadToEndAsync ( ) ;
@@ -190,8 +187,7 @@ CancellationToken cancellationToken
190187 }
191188
192189 var inputType = GetInputType ( scheduledWorker . TaskType ) ;
193- var inputData = SerializationHelper . DictonaryToObject ( inputType , pollResponse . InputData ,
194- ConductorConstants . IoJsonSerializerSettings ) ;
190+ var inputData = SerializationHelper . DictonaryToObject ( inputType , pollResponse . InputData , ConductorConstants . IoJsonSerializerSettings ) ;
195191 // Poll response data can be huge (if read from external storage)
196192 // We can save memory by not holding reference to pollResponse.InputData after it is parsed
197193 pollResponse . InputData = null ;
@@ -219,9 +215,7 @@ await _taskManager.UpdateAsync(
219215 {
220216 TaskId = pollResponse . TaskId ,
221217 Status = TaskResultStatus . COMPLETED ,
222- OutputData =
223- SerializationHelper . ObjectToDictionary ( response ,
224- ConductorConstants . IoJsonSerializerSettings ) ,
218+ OutputData = SerializationHelper . ObjectToDictionary ( response , ConductorConstants . IoJsonSerializerSettings ) ,
225219 WorkflowInstanceId = pollResponse . WorkflowInstanceId
226220 } ,
227221 tokenHolder . CancellationToken
@@ -237,9 +231,7 @@ await _taskManager.UpdateAsync(
237231 pollResponse . WorkflowInstanceId
238232 ) ;
239233 }
240- catch ( OperationCanceledException ) when
241- ( cancellationToken
242- . IsCancellationRequested ) // This is fine since we know cancellationToken comes from background service
234+ catch ( OperationCanceledException ) when ( cancellationToken . IsCancellationRequested ) // This is fine since we know cancellationToken comes from background service
243235 {
244236 _logger . LogWarning (
245237 "Cancelling task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) due to background service shutdown" ,
@@ -273,8 +265,7 @@ await Task.WhenAll(
273265 TaskId = pollResponse . TaskId ,
274266 Status = TaskResultStatus . FAILED ,
275267 ReasonForIncompletion = exception . Message ,
276- OutputData = SerializationHelper . ObjectToDictionary ( errorMessage ,
277- ConductorConstants . IoJsonSerializerSettings ) ,
268+ OutputData = SerializationHelper . ObjectToDictionary ( errorMessage , ConductorConstants . IoJsonSerializerSettings ) ,
278269 WorkflowInstanceId = pollResponse ? . WorkflowInstanceId
279270 } ,
280271 tokenHolder . CancellationToken
@@ -286,4 +277,4 @@ await Task.WhenAll(
286277 }
287278 }
288279 }
289- }
280+ }
0 commit comments