diff --git a/router/batchrouter/handle_async.go b/router/batchrouter/handle_async.go index c5dfdb89d7..d15146fcff 100644 --- a/router/batchrouter/handle_async.go +++ b/router/batchrouter/handle_async.go @@ -12,6 +12,7 @@ import ( "time" "github.com/google/uuid" + "github.com/samber/lo" "github.com/tidwall/gjson" "github.com/rudderlabs/rudder-go-kit/stats" @@ -43,7 +44,11 @@ func (brt *Handle) getImportingJobs(ctx context.Context, destinationID string, l } func (brt *Handle) updateJobStatuses(ctx context.Context, destinationID string, allJobs, completedJobs []*jobsdb.JobT, statusList []*jobsdb.JobStatusT) error { - reportMetrics := brt.getReportMetrics(statusList, brt.getParamertsFromJobs(allJobs)) + reportMetrics := brt.getReportMetrics(getReportMetricsParams{ + StatusList: statusList, + ParametersMap: brt.getParamertsFromJobs(allJobs), + JobsList: allJobs, + }) parameterFilters := []jobsdb.ParameterFilterT{{Name: "destination_id", Value: destinationID}} return misc.RetryWithNotify(ctx, brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { @@ -364,7 +369,13 @@ func (brt *Handle) asyncUploadWorker(ctx context.Context) { if uploadResponse.ImportingParameters != nil && len(uploadResponse.ImportingJobIDs) > 0 { brt.asyncDestinationStruct[destinationID].UploadInProgress = true } - brt.setMultipleJobStatus(uploadResponse, true, brt.asyncDestinationStruct[destinationID].AttemptNums, brt.asyncDestinationStruct[destinationID].FirstAttemptedAts, brt.asyncDestinationStruct[destinationID].OriginalJobParameters) + brt.setMultipleJobStatus(setMultipleJobStatusParams{ + AsyncOutput: uploadResponse, + Attempted: true, + AttemptNums: brt.asyncDestinationStruct[destinationID].AttemptNums, + FirstAttemptedAts: brt.asyncDestinationStruct[destinationID].FirstAttemptedAts, + OriginalJobParameters: brt.asyncDestinationStruct[destinationID].OriginalJobParameters, + }) brt.asyncStructCleanUp(destinationID) } brt.asyncDestinationStruct[destinationID].UploadMutex.Unlock() @@ -452,7 +463,13 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { out.SuccessResponse = fmt.Sprintf(`{"error":"%s"`, rterror.DisabledEgress.Error()) // skipcq: GO-R4002 } - brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs)) + brt.setMultipleJobStatus(setMultipleJobStatusParams{ + AsyncOutput: out, + AttemptNums: getAttemptNumbers(batchJobs.Jobs), + FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), + OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), + JobsList: batchJobs.Jobs, + }) return } @@ -469,7 +486,13 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { out.FailedReason = `Jobs flowed over the prescribed limit` } - brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs)) + brt.setMultipleJobStatus(setMultipleJobStatusParams{ + AsyncOutput: out, + AttemptNums: getAttemptNumbers(batchJobs.Jobs), + FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), + OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), + JobsList: batchJobs.Jobs, + }) return } } @@ -533,7 +556,15 @@ func (brt *Handle) sendJobsToStorage(batchJobs BatchedJobs) { out.FailedReason = `Jobs flowed over the prescribed limit` } - brt.setMultipleJobStatus(out, false, getAttemptNumbers(batchJobs.Jobs), getFirstAttemptAts(batchJobs.Jobs), getOriginalJobParameters(batchJobs.Jobs)) + brt.setMultipleJobStatus( + setMultipleJobStatusParams{ + AsyncOutput: out, + AttemptNums: getAttemptNumbers(batchJobs.Jobs), + FirstAttemptedAts: getFirstAttemptAts(batchJobs.Jobs), + OriginalJobParameters: getOriginalJobParameters(batchJobs.Jobs), + JobsList: batchJobs.Jobs, + }, + ) } newAttemptNums := getAttemptNumbers(batchJobs.Jobs) @@ -560,17 +591,20 @@ func (brt *Handle) createFakeJob(jobID int64, parameters stdjson.RawMessage) *jo } } -func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersMap map[int64]stdjson.RawMessage) []*utilTypes.PUReportedMetric { +func (brt *Handle) getReportMetrics(params getReportMetricsParams) []*utilTypes.PUReportedMetric { reportMetrics := make([]*utilTypes.PUReportedMetric, 0) connectionDetailsMap := make(map[string]*utilTypes.ConnectionDetails) transformedAtMap := make(map[string]string) statusDetailsMap := make(map[string]*utilTypes.StatusDetail) routerWorkspaceJobStatusCount := make(map[string]int) - for _, status := range statusList { + jobsMap := lo.SliceToMap(params.JobsList, func(j *jobsdb.JobT) (int64, *jobsdb.JobT) { + return j.JobID, j + }) + for _, status := range params.StatusList { var parameters routerutils.JobParameters - err := json.Unmarshal(parametersMap[status.JobID], ¶meters) + err := json.Unmarshal(params.ParametersMap[status.JobID], ¶meters) if err != nil { - brt.logger.Error("Unmarshal of job parameters failed. ", string(parametersMap[status.JobID])) + brt.logger.Error("Unmarshal of job parameters failed. ", string(params.ParametersMap[status.JobID])) } workspaceID := status.WorkspaceId eventName := parameters.EventName @@ -598,6 +632,9 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM errorCode = 0 } sampleEvent := routerutils.EmptyPayload + if job, ok := jobsMap[status.JobID]; ok { + sampleEvent = job.EventPayload + } sd = &utilTypes.StatusDetail{ Status: status.JobState, StatusCode: errorCode, @@ -647,105 +684,105 @@ func (brt *Handle) getReportMetrics(statusList []*jobsdb.JobStatusT, parametersM return reportMetrics } -func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, attempted bool, attemptNums map[int64]int, firstAttemptedAts map[int64]time.Time, originalJobParameters map[int64]stdjson.RawMessage) { - workspaceID := brt.GetWorkspaceIDForDestID(asyncOutput.DestinationID) +func (brt *Handle) setMultipleJobStatus(params setMultipleJobStatusParams) { + workspaceID := brt.GetWorkspaceIDForDestID(params.AsyncOutput.DestinationID) var completedJobsList []*jobsdb.JobT var statusList []*jobsdb.JobStatusT jobIDConnectionDetailsMap := make(map[int64]jobsdb.ConnectionDetails) - if len(asyncOutput.ImportingJobIDs) > 0 { - for _, jobId := range asyncOutput.ImportingJobIDs { + if len(params.AsyncOutput.ImportingJobIDs) > 0 { + for _, jobId := range params.AsyncOutput.ImportingJobIDs { jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{ - DestinationID: asyncOutput.DestinationID, - SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(), + DestinationID: params.AsyncOutput.DestinationID, + SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(), } status := jobsdb.JobStatusT{ JobID: jobId, JobState: jobsdb.Importing.State, - AttemptNum: attemptNums[jobId] + 1, + AttemptNum: params.AttemptNums[jobId] + 1, ExecTime: time.Now(), RetryTime: time.Now(), ErrorCode: "200", - ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", routerutils.EmptyPayload), - Parameters: asyncOutput.ImportingParameters, - JobParameters: originalJobParameters[jobId], + ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", routerutils.EmptyPayload), + Parameters: params.AsyncOutput.ImportingParameters, + JobParameters: params.OriginalJobParameters[jobId], WorkspaceId: workspaceID, } statusList = append(statusList, &status) } } - if len(asyncOutput.SucceededJobIDs) > 0 { - for _, jobId := range asyncOutput.SucceededJobIDs { + if len(params.AsyncOutput.SucceededJobIDs) > 0 { + for _, jobId := range params.AsyncOutput.SucceededJobIDs { jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{ - DestinationID: asyncOutput.DestinationID, - SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(), + DestinationID: params.AsyncOutput.DestinationID, + SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(), } status := jobsdb.JobStatusT{ JobID: jobId, JobState: jobsdb.Succeeded.State, - AttemptNum: attemptNums[jobId], + AttemptNum: params.AttemptNums[jobId], ExecTime: time.Now(), RetryTime: time.Now(), ErrorCode: "200", - ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(asyncOutput.SuccessResponse)), + ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(params.AsyncOutput.SuccessResponse)), Parameters: routerutils.EmptyPayload, - JobParameters: originalJobParameters[jobId], + JobParameters: params.OriginalJobParameters[jobId], WorkspaceId: workspaceID, } statusList = append(statusList, &status) - completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId])) + completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId])) } } - if len(asyncOutput.FailedJobIDs) > 0 { - for _, jobId := range asyncOutput.FailedJobIDs { + if len(params.AsyncOutput.FailedJobIDs) > 0 { + for _, jobId := range params.AsyncOutput.FailedJobIDs { jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{ - DestinationID: asyncOutput.DestinationID, - SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(), + DestinationID: params.AsyncOutput.DestinationID, + SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(), } - resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", asyncOutput.FailedReason) + resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", params.AsyncOutput.FailedReason) status := jobsdb.JobStatusT{ JobID: jobId, JobState: jobsdb.Failed.State, - AttemptNum: attemptNums[jobId], + AttemptNum: params.AttemptNums[jobId], ExecTime: time.Now(), RetryTime: time.Now(), ErrorCode: "500", - ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", resp), + ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", resp), Parameters: routerutils.EmptyPayload, - JobParameters: originalJobParameters[jobId], + JobParameters: params.OriginalJobParameters[jobId], WorkspaceId: workspaceID, } - if attempted { - status.AttemptNum = attemptNums[jobId] + 1 + if params.Attempted { + status.AttemptNum = params.AttemptNums[jobId] + 1 } if brt.retryLimitReached(&status) { status.JobState = jobsdb.Aborted.State - completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId])) + completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId])) } statusList = append(statusList, &status) } } - if len(asyncOutput.AbortJobIDs) > 0 { - for _, jobId := range asyncOutput.AbortJobIDs { + if len(params.AsyncOutput.AbortJobIDs) > 0 { + for _, jobId := range params.AsyncOutput.AbortJobIDs { jobIDConnectionDetailsMap[jobId] = jobsdb.ConnectionDetails{ - DestinationID: asyncOutput.DestinationID, - SourceID: gjson.GetBytes(originalJobParameters[jobId], "source_id").String(), + DestinationID: params.AsyncOutput.DestinationID, + SourceID: gjson.GetBytes(params.OriginalJobParameters[jobId], "source_id").String(), } - resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", asyncOutput.AbortReason) + resp := misc.UpdateJSONWithNewKeyVal(routerutils.EmptyPayload, "error", params.AsyncOutput.AbortReason) status := jobsdb.JobStatusT{ JobID: jobId, JobState: jobsdb.Aborted.State, - AttemptNum: attemptNums[jobId], + AttemptNum: params.AttemptNums[jobId], ExecTime: time.Now(), RetryTime: time.Now(), ErrorCode: "400", - ErrorResponse: routerutils.EnhanceJsonWithTime(firstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(resp)), + ErrorResponse: routerutils.EnhanceJsonWithTime(params.FirstAttemptedAts[jobId], "firstAttemptedAt", stdjson.RawMessage(resp)), Parameters: routerutils.EmptyPayload, - JobParameters: originalJobParameters[jobId], + JobParameters: params.OriginalJobParameters[jobId], WorkspaceId: workspaceID, } statusList = append(statusList, &status) - completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, originalJobParameters[jobId])) + completedJobsList = append(completedJobsList, brt.createFakeJob(jobId, params.OriginalJobParameters[jobId])) } } @@ -756,11 +793,15 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at parameterFilters := []jobsdb.ParameterFilterT{ { Name: "destination_id", - Value: asyncOutput.DestinationID, + Value: params.AsyncOutput.DestinationID, }, } - reportMetrics := brt.getReportMetrics(statusList, originalJobParameters) + reportMetrics := brt.getReportMetrics(getReportMetricsParams{ + StatusList: statusList, + ParametersMap: params.OriginalJobParameters, + JobsList: params.JobsList, + }) // Mark the status of the jobs err := misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { @@ -794,12 +835,12 @@ func (brt *Handle) setMultipleJobStatus(asyncOutput common.AsyncUploadOutput, at brt.destType, float64(len(completedJobsList)), ) - if attempted { + if params.Attempted { var sourceID string if len(statusList) > 0 { - sourceID = gjson.GetBytes(originalJobParameters[statusList[0].JobID], "source_id").String() + sourceID = gjson.GetBytes(params.OriginalJobParameters[statusList[0].JobID], "source_id").String() } - brt.recordAsyncDestinationDeliveryStatus(sourceID, asyncOutput.DestinationID, statusList) + brt.recordAsyncDestinationDeliveryStatus(sourceID, params.AsyncOutput.DestinationID, statusList) } } diff --git a/router/batchrouter/types.go b/router/batchrouter/types.go index 0df69805be..5635dbc5fc 100644 --- a/router/batchrouter/types.go +++ b/router/batchrouter/types.go @@ -1,10 +1,12 @@ package batchrouter import ( + stdjson "encoding/json" "time" backendconfig "github.com/rudderlabs/rudder-server/backend-config" "github.com/rudderlabs/rudder-server/jobsdb" + "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common" router_utils "github.com/rudderlabs/rudder-server/router/utils" ) @@ -59,3 +61,18 @@ type BatchedJobs struct { TimeWindow time.Time JobState string // ENUM waiting, executing, succeeded, waiting_retry, filtered, failed, aborted, migrating, migrated, wont_migrate } + +type getReportMetricsParams struct { + StatusList []*jobsdb.JobStatusT + ParametersMap map[int64]stdjson.RawMessage + JobsList []*jobsdb.JobT +} + +type setMultipleJobStatusParams struct { + AsyncOutput common.AsyncUploadOutput + Attempted bool + AttemptNums map[int64]int + FirstAttemptedAts map[int64]time.Time + OriginalJobParameters map[int64]stdjson.RawMessage + JobsList []*jobsdb.JobT +} diff --git a/router/batchrouter/worker.go b/router/batchrouter/worker.go index c38cc5d9c2..fbe539cf4e 100644 --- a/router/batchrouter/worker.go +++ b/router/batchrouter/worker.go @@ -140,7 +140,10 @@ func (w *worker) processJobAsync(jobsWg *sync.WaitGroup, destinationJobs *Destin if err != nil { panic(fmt.Errorf("storing %s jobs into ErrorDB: %w", brt.destType, err)) } - reportMetrics := brt.getReportMetrics(drainList, brt.getParamertsFromJobs(drainJobList)) + reportMetrics := brt.getReportMetrics(getReportMetricsParams{ + StatusList: drainList, + ParametersMap: brt.getParamertsFromJobs(drainJobList), + }) err = misc.RetryWithNotify(context.Background(), brt.jobsDBCommandTimeout.Load(), brt.jobdDBMaxRetries.Load(), func(ctx context.Context) error { return brt.jobsDB.WithUpdateSafeTx(ctx, func(tx jobsdb.UpdateSafeTx) error { err := brt.jobsDB.UpdateJobStatusInTx(ctx, tx, drainList, []string{brt.destType}, parameterFilters)