From 788fb074cc580019c3d2f5ba3962bbff999946a5 Mon Sep 17 00:00:00 2001 From: Sai Sankeerth Date: Tue, 24 Dec 2024 15:34:41 +0530 Subject: [PATCH 1/3] chore: async destination reporting --- router/batchrouter/handle_async.go | 163 ++++++++++++++++++++--------- router/batchrouter/worker.go | 5 +- 2 files changed, 115 insertions(+), 53 deletions(-) diff --git a/router/batchrouter/handle_async.go b/router/batchrouter/handle_async.go index c5dfdb89d7..f8e687ee9b 100644 --- a/router/batchrouter/handle_async.go +++ b/router/batchrouter/handle_async.go @@ -43,7 +43,11 @@ func (brt *Handle) getImportingJobs(ctx context.Context, destinationID string, l } func (brt *Handle) updateJobStatuses(ctx context.Context, destinationID string, allJobs, completedJobs []*jobsdb.JobT, statusList []*jobsdb.JobStatusT) error { - reportMetrics := brt.getReportMetrics(statusList, brt.getParamertsFromJobs(allJobs)) + reportMetrics := brt.getReportMetrics(GetReportMetricsParams{ + StatusList: statusList, + ParametersMap: brt.getParamertsFromJobs(allJobs), + JobsList: allJobs, + }) parameterFilters := []jobsdb.ParameterFilterT{{Name: "destination_id", Value: destinationID}} return misc.RetryWithNotify(ctx, brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { @@ -364,7 +368,13 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) { if uploadResponse.ImportingParameters != nil && len(uploadResponse.ImportingJobIDs) > 0 { brt.asyncDestinationStruct[destinationID].UploadInProgress = true } - brt.setMultipleJobStatus(uploadResponse, true, brt.asyncDestinationStruct[destinationID].AttemptNums, brt.asyncDestinationStruct[destinationID].FirstAttemptedAts, brt.asyncDestinationStruct[destinationID].OriginalJobParameters) + brt.setMultipleJobStatus(SetMultipleJobStatusParams{ + AsyncOutput: uploadResponse, + Attempted: true, + AttemptNums: brt.asyncDestinationStruct[destinationID].AttemptNums, + FirstAttemptedAts: brt.asyncDestinationStruct[destinationID].FirstAttemptedAts, + OriginalJobParameters: brt.asyncDestinationStruct[destinationID].OriginalJobParameters, + }) brt.asyncStructCleanUp(destinationID) } brt.asyncDestinationStruct[destinationID].UploadMutex.Unlock() @@ -452,7 +462,14 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { out.SuccessResponse = fmt.Sprintf(`{"error":"%s"`, rterror.DisabledEgress.Error()) // skipcq: GO-R4002 } - brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs)) + brt.setMultipleJobStatus(SetMultipleJobStatusParams{ + AsyncOutput: out, + Attempted: false, + AttemptNums: getAttemptNumbers(batchJobs.Jobs), + FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), + OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), + JobsList: batchJobs.Jobs, + }) return } @@ -469,7 +486,14 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { out.FailedReason = `Jobs flowed over the prescribed limit` } - brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs)) + brt.setMultipleJobStatus(SetMultipleJobStatusParams{ + AsyncOutput: out, + Attempted: false, + AttemptNums: getAttemptNumbers(batchJobs.Jobs), + FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), + OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), + JobsList: batchJobs.Jobs, + }) return } } @@ -533,7 +557,16 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { out.FailedReason = `Jobs flowed over the prescribed limit` } - brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs)) + brt.setMultipleJobStatus( + SetMultipleJobStatusParams{ + AsyncOutput: out, + Attempted: false, + AttemptNums: getAttemptNumbers(batchJobs.Jobs), + FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), + OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), + JobsList: batchJobs.Jobs, + }, + ) } newAttemptNums := getAttemptNumbers(batchJobs.Jobs) @@ -560,17 +593,27 @@ func (brt *Handle) createFakeJob(jobID int64, parameters stdjson.RawMessage) *jo } } -func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersMap map[int64]stdjson.RawMessage) []*utilTypes.PUReportedMetric { +type GetReportMetricsParams struct { + StatusList []*jobsdb.JobStatusT + ParametersMap map[int64]stdjson.RawMessage + JobsList []*jobsdb.JobT +} + +func (brt *Handle) getReportMetrics(params GetReportMetricsParams) []*utilTypes.PUReportedMetric { reportMetrics := make([]*utilTypes.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*utilTypes.ConnectionDetails) transformedAtMap := make(map[string]string) statusDetailsMap := make(map[string]*utilTypes.StatusDetail) routerWorkspaceJobStatusCount := make(map[string]int) - for _, status := range statusList { + jobsMap := make(map[int64]*jobsdb.JobT) + for _, job := range params.JobsList { + jobsMap[job.JobID] = job + } + for _, status := range params.StatusList { var parameters routerutils.JobParameters - err := json.Unmarshal(parametersMap[status.JobID], ¶meters) + err := json.Unmarshal(params.ParametersMap[status.JobID], ¶meters) if err != nil { - brt.logger.Error("Unmarshal of job parameters failed. ", string(parametersMap[status.JobID])) + brt.logger.Error("Unmarshal of job parameters failed. ", string(params.ParametersMap[status.JobID])) } workspaceID := status.WorkspaceId eventName := parameters.EventName @@ -598,6 +641,9 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM errorCode = 0 } sampleEvent := routerutils.EmptyPayload + if job, ok := jobsMap[status.JobID]; ok { + sampleEvent = job.EventPayload + } sd = &utilTypes.StatusDetail{ Status: status.JobState, StatusCode: errorCode, @@ -647,105 +693,114 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM return reportMetrics } -func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, attempted bool, attemptNums map[int64]int, firstAttemptedAts map[int64]time.Time, originalJobParameters map[int64]stdjson.RawMessage) { - workspaceID := brt.GetWorkspaceIDForDestID(asyncOutput.DestinationID) +type SetMultipleJobStatusParams struct { + AsyncOutput common.AsyncUploadOutput + Attempted bool + AttemptNums map[int64]int + FirstAttemptedAts map[int64]time.Time + OriginalJobParameters map[int64]stdjson.RawMessage + JobsList []*jobsdb.JobT +} + +func (brt *Handle) setMultipleJobStatus(params SetMultipleJobStatusParams) { + workspaceID := brt.GetWorkspaceIDForDestID(params.AsyncOutput.DestinationID) var completedJobsList []*jobsdb.JobT var statusList []*jobsdb.JobStatusT jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails) - if len(asyncOutput.ImportingJobIDs) > 0 { - for _, jobId := range asyncOutput.ImportingJobIDs { + if len(params.AsyncOutput.ImportingJobIDs) > 0 { + for _, jobId := range params.AsyncOutput.ImportingJobIDs { jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{ - DestinationID: asyncOutput.DestinationID, - SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(), + DestinationID: params.AsyncOutput.DestinationID, + SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(), } status := jobsdb.JobStatusT{ JobID: jobId, JobState: jobsdb.Importing.State, - AttemptNum: attemptNums[jobId] + 1, + AttemptNum: params.AttemptNums[jobId] + 1, ExecTime: time.Now(), RetryTime: time.Now(), ErrorCode: "200", - ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", routerutils.EmptyPayload), - Parameters: asyncOutput.ImportingParameters, - JobParameters: originalJobParameters[jobId], + ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", routerutils.EmptyPayload), + Parameters: params.AsyncOutput.ImportingParameters, + JobParameters: params.OriginalJobParameters[jobId], WorkspaceId: workspaceID, } statusList = append(statusList, &status) } } - if len(asyncOutput.SucceededJobIDs) > 0 { - for _, jobId := range asyncOutput.SucceededJobIDs { + if len(params.AsyncOutput.SucceededJobIDs) > 0 { + for _, jobId := range params.AsyncOutput.SucceededJobIDs { jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{ - DestinationID: asyncOutput.DestinationID, - SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(), + DestinationID: params.AsyncOutput.DestinationID, + SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(), } status := jobsdb.JobStatusT{ JobID: jobId, JobState: jobsdb.Succeeded.State, - AttemptNum: attemptNums[jobId], + AttemptNum: params.AttemptNums[jobId], ExecTime: time.Now(), RetryTime: time.Now(), ErrorCode: "200", - ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(asyncOutput.SuccessResponse)), + ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(params.AsyncOutput.SuccessResponse)), Parameters: routerutils.EmptyPayload, - JobParameters: originalJobParameters[jobId], + JobParameters: params.OriginalJobParameters[jobId], WorkspaceId: workspaceID, } statusList = append(statusList, &status) - completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId])) + completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId])) } } - if len(asyncOutput.FailedJobIDs) > 0 { - for _, jobId := range asyncOutput.FailedJobIDs { + if len(params.AsyncOutput.FailedJobIDs) > 0 { + for _, jobId := range params.AsyncOutput.FailedJobIDs { jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{ - DestinationID: asyncOutput.DestinationID, - SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(), + DestinationID: params.AsyncOutput.DestinationID, + SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(), } - resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", asyncOutput.FailedReason) + resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", params.AsyncOutput.FailedReason) status := jobsdb.JobStatusT{ JobID: jobId, JobState: jobsdb.Failed.State, - AttemptNum: attemptNums[jobId], + AttemptNum: params.AttemptNums[jobId], ExecTime: time.Now(), RetryTime: time.Now(), ErrorCode: "500", - ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", resp), + ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", resp), Parameters: routerutils.EmptyPayload, - JobParameters: originalJobParameters[jobId], + JobParameters: params.OriginalJobParameters[jobId], WorkspaceId: workspaceID, } - if attempted { - status.AttemptNum = attemptNums[jobId] + 1 + if params.Attempted { + status.AttemptNum = params.AttemptNums[jobId] + 1 } if brt.retryLimitReached(&status) { status.JobState = jobsdb.Aborted.State - completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId])) + completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId])) } statusList = append(statusList, &status) } } - if len(asyncOutput.AbortJobIDs) > 0 { - for _, jobId := range asyncOutput.AbortJobIDs { + if len(params.AsyncOutput.AbortJobIDs) > 0 { + for _, jobId := range params.AsyncOutput.AbortJobIDs { jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{ - DestinationID: asyncOutput.DestinationID, - SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(), + DestinationID: params.AsyncOutput.DestinationID, + SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(), } - resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", asyncOutput.AbortReason) + resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", params.AsyncOutput.AbortReason) status := jobsdb.JobStatusT{ JobID: jobId, JobState: jobsdb.Aborted.State, - AttemptNum: attemptNums[jobId], + AttemptNum: params.AttemptNums[jobId], ExecTime: time.Now(), RetryTime: time.Now(), ErrorCode: "400", - ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(resp)), + ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(resp)), Parameters: routerutils.EmptyPayload, - JobParameters: originalJobParameters[jobId], + JobParameters: params.OriginalJobParameters[jobId], WorkspaceId: workspaceID, } statusList = append(statusList, &status) - completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId])) + completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId])) } } @@ -756,11 +811,15 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at parameterFilters := []jobsdb.ParameterFilterT{ { Name: "destination_id", - Value: asyncOutput.DestinationID, + Value: params.AsyncOutput.DestinationID, }, } - reportMetrics := brt.getReportMetrics(statusList, originalJobParameters) + reportMetrics := brt.getReportMetrics(GetReportMetricsParams{ + StatusList: statusList, + ParametersMap: params.OriginalJobParameters, + JobsList: params.JobsList, + }) // Mark the status of the jobs err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { @@ -794,12 +853,12 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at brt.destType, float64(len(completedJobsList)), ) - if attempted { + if params.Attempted { var sourceID string if len(statusList) > 0 { - sourceID = gjson.GetBytes(originalJobParameters[statusList[0].JobID], "source_id").String() + sourceID = gjson.GetBytes(params.OriginalJobParameters[statusList[0].JobID], "source_id").String() } - brt.recordAsyncDestinationDeliveryStatus(sourceID, asyncOutput.DestinationID, statusList) + brt.recordAsyncDestinationDeliveryStatus(sourceID, params.AsyncOutput.DestinationID, statusList) } } diff --git a/router/batchrouter/worker.go b/router/batchrouter/worker.go index c38cc5d9c2..8e531b8563 100644 --- a/router/batchrouter/worker.go +++ b/router/batchrouter/worker.go @@ -140,7 +140,10 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin if err != nil { panic(fmt.Errorf("storing %s jobs into ErrorDB: %w", brt.destType, err)) } - reportMetrics := brt.getReportMetrics(drainList, brt.getParamertsFromJobs(drainJobList)) + reportMetrics := brt.getReportMetrics(GetReportMetricsParams{ + StatusList: drainList, + ParametersMap: brt.getParamertsFromJobs(drainJobList), + }) err = misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { return brt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error { err := brt.jobsDB.UpdateJobStatusInTx(ctx, tx, drainList, []string{brt.destType}, parameterFilters) From 0892679bc906bd1a422e0dcb1f907212afbe89b6 Mon Sep 17 00:00:00 2001 From: Sai Sankeerth Date: Mon, 6 Jan 2025 20:23:14 +0530 Subject: [PATCH 2/3] chore: removing redundant properties --- router/batchrouter/handle_async.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/router/batchrouter/handle_async.go b/router/batchrouter/handle_async.go index f8e687ee9b..e73c16ed57 100644 --- a/router/batchrouter/handle_async.go +++ b/router/batchrouter/handle_async.go @@ -464,7 +464,6 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { brt.setMultipleJobStatus(SetMultipleJobStatusParams{ AsyncOutput: out, - Attempted: false, AttemptNums: getAttemptNumbers(batchJobs.Jobs), FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), @@ -488,7 +487,6 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { brt.setMultipleJobStatus(SetMultipleJobStatusParams{ AsyncOutput: out, - Attempted: false, AttemptNums: getAttemptNumbers(batchJobs.Jobs), FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), @@ -560,7 +558,6 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { brt.setMultipleJobStatus( SetMultipleJobStatusParams{ AsyncOutput: out, - Attempted: false, AttemptNums: getAttemptNumbers(batchJobs.Jobs), FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), From fd5959e7e64cc03fb555b5db811f08723ee39c98 Mon Sep 17 00:00:00 2001 From: Sai Sankeerth Date: Wed, 8 Jan 2025 10:50:06 +0530 Subject: [PATCH 3/3] chore: review comments --- router/batchrouter/handle_async.go | 39 +++++++++--------------------- router/batchrouter/types.go | 17 +++++++++++++ router/batchrouter/worker.go | 2 +- 3 files changed, 30 insertions(+), 28 deletions(-) diff --git a/router/batchrouter/handle_async.go b/router/batchrouter/handle_async.go index e73c16ed57..d15146fcff 100644 --- a/router/batchrouter/handle_async.go +++ b/router/batchrouter/handle_async.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/uuid" + "github.com/samber/lo" "github.com/tidwall/gjson" "github.com/rudderlabs/rudder-go-kit/stats" @@ -43,7 +44,7 @@ func (brt *Handle) getImportingJobs(ctx context.Context, destinationID string, l } func (brt *Handle) updateJobStatuses(ctx context.Context, destinationID string, allJobs, completedJobs []*jobsdb.JobT, statusList []*jobsdb.JobStatusT) error { - reportMetrics := brt.getReportMetrics(GetReportMetricsParams{ + reportMetrics := brt.getReportMetrics(getReportMetricsParams{ StatusList: statusList, ParametersMap: brt.getParamertsFromJobs(allJobs), JobsList: allJobs, @@ -368,7 +369,7 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) { if uploadResponse.ImportingParameters != nil && len(uploadResponse.ImportingJobIDs) > 0 { brt.asyncDestinationStruct[destinationID].UploadInProgress = true } - brt.setMultipleJobStatus(SetMultipleJobStatusParams{ + brt.setMultipleJobStatus(setMultipleJobStatusParams{ AsyncOutput: uploadResponse, Attempted: true, AttemptNums: brt.asyncDestinationStruct[destinationID].AttemptNums, @@ -462,7 +463,7 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { out.SuccessResponse = fmt.Sprintf(`{"error":"%s"`, rterror.DisabledEgress.Error()) // skipcq: GO-R4002 } - brt.setMultipleJobStatus(SetMultipleJobStatusParams{ + brt.setMultipleJobStatus(setMultipleJobStatusParams{ AsyncOutput: out, AttemptNums: getAttemptNumbers(batchJobs.Jobs), FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), @@ -485,7 +486,7 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { out.FailedReason = `Jobs flowed over the prescribed limit` } - brt.setMultipleJobStatus(SetMultipleJobStatusParams{ + brt.setMultipleJobStatus(setMultipleJobStatusParams{ AsyncOutput: out, AttemptNums: getAttemptNumbers(batchJobs.Jobs), FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), @@ -556,7 +557,7 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { } brt.setMultipleJobStatus( - SetMultipleJobStatusParams{ + setMultipleJobStatusParams{ AsyncOutput: out, AttemptNums: getAttemptNumbers(batchJobs.Jobs), FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), @@ -590,22 +591,15 @@ func (brt *Handle) createFakeJob(jobID int64, parameters stdjson.RawMessage) *jo } } -type GetReportMetricsParams struct { - StatusList []*jobsdb.JobStatusT - ParametersMap map[int64]stdjson.RawMessage - JobsList []*jobsdb.JobT -} - -func (brt *Handle) getReportMetrics(params GetReportMetricsParams) []*utilTypes.PUReportedMetric { +func (brt *Handle) getReportMetrics(params getReportMetricsParams) []*utilTypes.PUReportedMetric { reportMetrics := make([]*utilTypes.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*utilTypes.ConnectionDetails) transformedAtMap := make(map[string]string) statusDetailsMap := make(map[string]*utilTypes.StatusDetail) routerWorkspaceJobStatusCount := make(map[string]int) - jobsMap := make(map[int64]*jobsdb.JobT) - for _, job := range params.JobsList { - jobsMap[job.JobID] = job - } + jobsMap := lo.SliceToMap(params.JobsList, func(j *jobsdb.JobT) (int64, *jobsdb.JobT) { + return j.JobID, j + }) for _, status := range params.StatusList { var parameters routerutils.JobParameters err := json.Unmarshal(params.ParametersMap[status.JobID], ¶meters) @@ -690,16 +684,7 @@ func (brt *Handle) getReportMetrics(params GetReportMetricsParams) []*utilTypes. return reportMetrics } -type SetMultipleJobStatusParams struct { - AsyncOutput common.AsyncUploadOutput - Attempted bool - AttemptNums map[int64]int - FirstAttemptedAts map[int64]time.Time - OriginalJobParameters map[int64]stdjson.RawMessage - JobsList []*jobsdb.JobT -} - -func (brt *Handle) setMultipleJobStatus(params SetMultipleJobStatusParams) { +func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) { workspaceID := brt.GetWorkspaceIDForDestID(params.AsyncOutput.DestinationID) var completedJobsList []*jobsdb.JobT var statusList []*jobsdb.JobStatusT @@ -812,7 +797,7 @@ func (brt *Handle) setMultipleJobStatus(params SetMultipleJobStatusParams) { }, } - reportMetrics := brt.getReportMetrics(GetReportMetricsParams{ + reportMetrics := brt.getReportMetrics(getReportMetricsParams{ StatusList: statusList, ParametersMap: params.OriginalJobParameters, JobsList: params.JobsList, diff --git a/router/batchrouter/types.go b/router/batchrouter/types.go index 0df69805be..5635dbc5fc 100644 --- a/router/batchrouter/types.go +++ b/router/batchrouter/types.go @@ -1,10 +1,12 @@ package batchrouter import ( + stdjson "encoding/json" "time" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" router_utils "github.com/rudderlabs/rudder-server/router/utils" ) @@ -59,3 +61,18 @@ type BatchedJobs struct { TimeWindow time.Time JobState string // ENUM waiting, executing, succeeded, waiting_retry, filtered, failed, aborted, migrating, migrated, wont_migrate } + +type getReportMetricsParams struct { + StatusList []*jobsdb.JobStatusT + ParametersMap map[int64]stdjson.RawMessage + JobsList []*jobsdb.JobT +} + +type setMultipleJobStatusParams struct { + AsyncOutput common.AsyncUploadOutput + Attempted bool + AttemptNums map[int64]int + FirstAttemptedAts map[int64]time.Time + OriginalJobParameters map[int64]stdjson.RawMessage + JobsList []*jobsdb.JobT +} diff --git a/router/batchrouter/worker.go b/router/batchrouter/worker.go index 8e531b8563..fbe539cf4e 100644 --- a/router/batchrouter/worker.go +++ b/router/batchrouter/worker.go @@ -140,7 +140,7 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin if err != nil { panic(fmt.Errorf("storing %s jobs into ErrorDB: %w", brt.destType, err)) } - reportMetrics := brt.getReportMetrics(GetReportMetricsParams{ + reportMetrics := brt.getReportMetrics(getReportMetricsParams{ StatusList: drainList, ParametersMap: brt.getParamertsFromJobs(drainJobList), })