Skip to content

Commit

Permalink
✨ Add the possibility to set a timeout for waiting job completion (#76)
Browse files Browse the repository at this point in the history
<!--
Copyright (C) 2020-2022 Arm Limited or its affiliates and Contributors.
All rights reserved.
SPDX-License-Identifier: Proprietary
-->
### Description

- add a timeout entry and determine retries based on this value


### Test Coverage

<!--
Please put an `x` in the correct box e.g. `[x]` to indicate the testing
coverage of this change.
-->

- [x]  This change is covered by existing or additional automated tests.
- [ ] Manual testing has been performed (and evidence provided) as
automated testing was not feasible.
- [ ] Additional tests are not required for this change (e.g.
documentation update).
  • Loading branch information
acabarbaye authored Jun 20, 2024
1 parent 4cadac7 commit 06e8903
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 21 deletions.
1 change: 1 addition & 0 deletions changes/20240619155022.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
:sparkles: Add the possibility to set a timeout for waiting job completion
5 changes: 4 additions & 1 deletion utils/job/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package job

import (
"context"
"time"

"github.com/ARM-software/embedded-development-services-client-utils/utils/resource"
)
Expand Down Expand Up @@ -43,6 +44,8 @@ type IJobManager interface {
HasJobCompleted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)
// HasJobStarted calls the services to determine whether the job has started.
HasJobStarted(ctx context.Context, job IAsynchronousJob) (completed bool, err error)
// WaitForJobCompletion waits for a job to complete.
// WaitForJobCompletion waits for a job to complete. Similar to WaitForJobCompletionWithTimeout but with a timeout set to 5 minutes.
WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error)
// WaitForJobCompletionWithTimeout waits for a job to complete but with timeout protection.
WaitForJobCompletionWithTimeout(ctx context.Context, job IAsynchronousJob, timeout time.Duration) (err error)
}
38 changes: 25 additions & 13 deletions utils/job/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package job
import (
"context"
"fmt"
"math"
"net/http"
"time"

Expand Down Expand Up @@ -55,19 +56,24 @@ func (m *Manager) FetchJobMessagesFirstPage(ctx context.Context, job IAsynchrono
return
}

func waitForJobState(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob, jobState string, checkStateFunc func(context.Context, IAsynchronousJob) (bool, error)) (err error) {
func waitForJobState(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob, jobState string, checkStateFunc func(context.Context, IAsynchronousJob) (bool, error), timeout time.Duration) (err error) {
err = parallelisation.DetermineContextError(ctx)
if err != nil {
return
}

subCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
retryCfg := retry.DefaultExponentialBackoffRetryPolicyConfiguration()
retryCfg.RetryMax = int(float64(timeout.Milliseconds())/math.Max(float64(retryCfg.RetryWaitMin.Milliseconds()), 1)) + 1

jobName, err := job.FetchName()
if err != nil {
return
}
notStartedError := fmt.Errorf("%w: job [%v] has not reached the expected state [%v]", commonerrors.ErrCondition, jobName, jobState)
err = retry.RetryOnError(ctx, logs.NewPlainLogrLoggerFromLoggers(logger), retry.DefaultExponentialBackoffRetryPolicyConfiguration(), func() error {
inState, subErr := checkStateFunc(ctx, job)
err = retry.RetryOnError(subCtx, logs.NewPlainLogrLoggerFromLoggers(logger), retryCfg, func() error {
inState, subErr := checkStateFunc(subCtx, job)
if subErr != nil {
return subErr
}
Expand All @@ -79,12 +85,12 @@ func waitForJobState(ctx context.Context, logger messages.IMessageLogger, job IA
return
}

func (m *Manager) waitForJobToStart(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob) error {
return waitForJobState(ctx, logger, job, "start", m.HasJobStarted)
func (m *Manager) waitForJobToStart(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob, timeout time.Duration) error {
return waitForJobState(ctx, logger, job, "start", m.HasJobStarted, timeout)
}

func (m *Manager) waitForJobToHaveMessagesAvailable(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob) error {
return waitForJobState(ctx, logger, job, "have messages", m.areThereMessages)
func (m *Manager) waitForJobToHaveMessagesAvailable(ctx context.Context, logger messages.IMessageLogger, job IAsynchronousJob, timeout time.Duration) error {
return waitForJobState(ctx, logger, job, "have messages", m.areThereMessages, timeout)
}

func (m *Manager) createMessagePaginator(ctx context.Context, job IAsynchronousJob) (paginator pagination.IStreamPaginatorAndPageFetcher, err error) {
Expand All @@ -94,7 +100,11 @@ func (m *Manager) createMessagePaginator(ctx context.Context, job IAsynchronousJ
return
}

func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) (err error) {
func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob) error {
return m.WaitForJobCompletionWithTimeout(ctx, job, 5*time.Minute)
}

func (m *Manager) WaitForJobCompletionWithTimeout(ctx context.Context, job IAsynchronousJob, timeout time.Duration) (err error) {
err = parallelisation.DetermineContextError(ctx)
if err != nil {
return
Expand All @@ -108,15 +118,17 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
_ = messageLogger.Close()
}
}()
err = m.waitForJobToStart(ctx, messageLogger, job)
subCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
err = m.waitForJobToStart(subCtx, messageLogger, job, timeout)
if err != nil {
return
}
err = m.waitForJobToHaveMessagesAvailable(ctx, messageLogger, job)
err = m.waitForJobToHaveMessagesAvailable(subCtx, messageLogger, job, timeout)
if err != nil {
return
}
messagePaginator, err := m.createMessagePaginator(ctx, job)
messagePaginator, err := m.createMessagePaginator(subCtx, job)
if err != nil {
return
}
Expand All @@ -126,7 +138,7 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
}
}()

wait, gCtx := errgroup.WithContext(ctx)
wait, gCtx := errgroup.WithContext(subCtx)
wait.Go(func() error {
return messageLogger.LogMessagesCollection(gCtx, messagePaginator)

Expand All @@ -138,7 +150,7 @@ func (m *Manager) WaitForJobCompletion(ctx context.Context, job IAsynchronousJob
if err != nil {
messageLogger.LogError(err)
}
_, err = m.HasJobCompleted(ctx, job)
_, err = m.HasJobCompleted(subCtx, job)
return
}

Expand Down
59 changes: 52 additions & 7 deletions utils/job/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
pagination2 "github.com/ARM-software/embedded-development-services-client-utils/utils/pagination"
"github.com/ARM-software/golang-utils/utils/collection/pagination"
"github.com/ARM-software/golang-utils/utils/commonerrors"
"github.com/ARM-software/golang-utils/utils/commonerrors/errortest"
"github.com/ARM-software/golang-utils/utils/field"
)

func TestManager_HasJobCompleted(t *testing.T) {
Expand Down Expand Up @@ -77,7 +79,7 @@ func TestManager_HasJobCompleted(t *testing.T) {
assert.Equal(t, test.expectCompleted, completed)
} else {
assert.Error(t, err)
assert.True(t, commonerrors.Any(err, test.expectedError))
errortest.AssertError(t, err, test.expectedError)
assert.Equal(t, test.expectCompleted, completed)
}
})
Expand Down Expand Up @@ -132,7 +134,7 @@ func TestManager_checkForMessageStreamExhaustion(t *testing.T) {
assert.True(t, messagePaginator.IsRunningDry())
} else {
assert.Error(t, err)
assert.True(t, commonerrors.Any(err, test.expectedError))
errortest.AssertError(t, err, test.expectedError)
}
})
}
Expand All @@ -149,15 +151,17 @@ func TestManager_WaitForJobCompletion(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct {
jobFunc func() (IAsynchronousJob, error)
expectedError error
expectedError []error
timeout *time.Duration
}{
{
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
expectedError: commonerrors.ErrInvalid,
expectedError: []error{commonerrors.ErrInvalid},
},
{
jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob),
expectedError: commonerrors.ErrCondition,
expectedError: []error{commonerrors.ErrCondition, commonerrors.ErrTimeout, commonerrors.ErrCancelled},
timeout: field.ToOptionalDuration(500 * time.Millisecond),
},
{
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
Expand All @@ -178,17 +182,58 @@ func TestManager_WaitForJobCompletion(t *testing.T) {

require.NoError(t, err)
require.NotNil(t, factory)
err = factory.WaitForJobCompletion(context.TODO(), job)
if test.timeout == nil {
err = factory.WaitForJobCompletion(context.TODO(), job)
} else {
err = factory.WaitForJobCompletionWithTimeout(context.TODO(), job, *test.timeout)
}
if test.expectedError == nil {
assert.NoError(t, err)
} else {
assert.Error(t, err)
assert.True(t, commonerrors.Any(err, test.expectedError))
errortest.AssertError(t, err, test.expectedError...)
}
})
}
}

func TestManager_WaitForJobCompletionTimeout(t *testing.T) {
defer goleak.VerifyNone(t)
tests := []struct {
jobFunc func() (IAsynchronousJob, error)
}{
{
jobFunc: mapFunc(jobtest.NewMockFailedAsynchronousJob),
},
{
jobFunc: mapFunc(jobtest.NewMockQueuedAsynchronousJob),
},
{
jobFunc: mapFunc(jobtest.NewMockSuccessfulAsynchronousJob),
},
}
for i := range tests {
test := tests[i]

t.Run(fmt.Sprintf("#%v", i), func(t *testing.T) {
// t.Parallel()
logger, err := logging.NewStandardClientLogger(fmt.Sprintf("test #%v", i), nil)
require.NoError(t, err)
loggerF := messages.NewMessageLoggerFactory(logger, false, time.Nanosecond)
job, err := test.jobFunc()
runOut := time.Nanosecond
factory, err := newMockJobManager(loggerF, time.Nanosecond, &runOut, job, err)

require.NoError(t, err)
require.NotNil(t, factory)

err = factory.WaitForJobCompletionWithTimeout(context.TODO(), job, time.Nanosecond)
assert.Error(t, err)
errortest.AssertError(t, err, commonerrors.ErrTimeout, commonerrors.ErrCancelled, commonerrors.ErrCondition)
})
}
}

func newMockJobManager(logger *messages.MessageLoggerFactory, backOffPeriod time.Duration, messagePaginatorRunOutTimeout *time.Duration, job IAsynchronousJob, errToReturn error) (*Manager, error) {
pageNumber := rand.Intn(50) //nolint:gosec //causes G404: Use of weak random number generator
messageStream := messages.NewMockMessagePaginatorFactory(pageNumber)
Expand Down
15 changes: 15 additions & 0 deletions utils/mocks/mock_job.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 06e8903

Please sign in to comment.