diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index 9bb0bf1161..c540f878f8 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -73,12 +73,6 @@ const ( pgErrorCodeTableReadonly = "RS001" ) -var payloadTypes = map[payloadColumnType]string{ - JSONB: "jsonb", - TEXT: "text", - BYTEA: "bytea", -} - type payloadColumnType string const ( diff --git a/services/oauth/v2/oauth.go b/services/oauth/v2/oauth.go index 70efa0ca53..1be55950a0 100644 --- a/services/oauth/v2/oauth.go +++ b/services/oauth/v2/oauth.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "strings" "time" "github.com/tidwall/gjson" @@ -141,11 +142,11 @@ func (h *OAuthHandler) FetchToken(fetchTokenParams *RefreshTokenParams) (int, *A authErrCategory: "", errorMessage: "", destDefName: fetchTokenParams.DestDefName, - isTokenFetch: true, flowType: h.RudderFlowType, action: "fetch_token", } - return h.GetTokenInfo(fetchTokenParams, "Fetch token", authStats) + statshandler := NewStatsHandlerFromOAuthStats(authStats) + return h.GetTokenInfo(fetchTokenParams, "Fetch token", statshandler) } /* @@ -176,10 +177,11 @@ func (h *OAuthHandler) RefreshToken(refTokenParams *RefreshTokenParams) (int, *A action: "refresh_token", stats: h.stats, } - return h.GetTokenInfo(refTokenParams, "Refresh token", authStats) + statsHandler := NewStatsHandlerFromOAuthStats(authStats) + return h.GetTokenInfo(refTokenParams, "Refresh token", statsHandler) } -func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse, error) { +func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, statsHandler OAuthStatsHandler) (int, *AuthResponse, error) { log := h.Logger.Withn( logger.NewStringField("Call Type", logTypeName), logger.NewStringField("AccountId", refTokenParams.AccountID), @@ -189,13 +191,13 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN ) log.Debugn("[request] :: Get Token Info request received") startTime := time.Now() - defer func() { - authStats.statName = GetOAuthActionStatName("total_latency") - authStats.isCallToCpApi = false - authStats.SendTimerStats(startTime) - }() h.CacheMutex.Lock(refTokenParams.AccountID) defer h.CacheMutex.Unlock(refTokenParams.AccountID) + defer func() { + statsHandler.SendTiming(startTime, "total_latency", stats.Tags{ + "isCallToCpApi": "false", + }) + }() refTokenBody := RefreshTokenBodyParams{} storedCache, ok := h.Cache.Load(refTokenParams.AccountID) if ok { @@ -205,7 +207,7 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN return http.StatusInternalServerError, nil, errors.New("failed to type assert the stored cache") } // TODO: verify if the storedCache is nil at this point - if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, authStats) { + if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, statsHandler) { return http.StatusOK, cachedSecret, nil } // Refresh token preparation @@ -214,7 +216,7 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN ExpiredSecret: refTokenParams.Secret, } } - statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, authStats, logTypeName) + statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, statsHandler, logTypeName) // handling of refresh token response if statusCode == http.StatusOK { // fetching/refreshing through control plane was successful @@ -241,11 +243,7 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC action: action, stats: h.stats, } - defer func() { - authStatusToggleStats.statName = GetOAuthActionStatName("total_latency") - authStatusToggleStats.isCallToCpApi = false - authStatusToggleStats.SendTimerStats(authErrHandlerTimeStart) - }() + statsHandler := NewStatsHandlerFromOAuthStats(authStatusToggleStats) h.CacheMutex.Lock(params.RudderAccountID) isAuthStatusUpdateActive, isAuthStatusUpdateReqPresent := h.AuthStatusUpdateActiveMap[destinationId] if isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive { @@ -266,6 +264,11 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC h.Cache.Delete(params.RudderAccountID) h.CacheMutex.Unlock(params.RudderAccountID) }() + defer func() { + statsHandler.SendTiming(authErrHandlerTimeStart, "total_latency", stats.Tags{ + "isCallToCpApi": "false", + }) + }() authStatusToggleUrl := fmt.Sprintf("%s/workspaces/%s/destinations/%s/authStatus/toggle", h.ConfigBEURL, params.WorkspaceID, destinationId) @@ -278,14 +281,15 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC RequestType: action, BasicAuthUser: h.Identity(), } - authStatusToggleStats.statName = GetOAuthActionStatName("request_sent") - authStatusToggleStats.isCallToCpApi = true - authStatusToggleStats.SendCountStat() + statsHandler.Increment("request_sent", stats.Tags{ + "isCallToCpApi": "true", + }) cpiCallStartTime := time.Now() statusCode, respBody = h.CpConn.CpApiCall(authStatusInactiveCpReq) - authStatusToggleStats.statName = GetOAuthActionStatName("request_latency") - authStatusToggleStats.SendTimerStats(cpiCallStartTime) + statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{ + "isCallToCpApi": "true", + }) h.Logger.Debugn("[request] :: Response from CP for auth status inactive req", logger.NewIntField("StatusCode", int64(statusCode)), logger.NewStringField("Response", respBody)) @@ -299,18 +303,20 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC } else { msg = fmt.Sprintf("Could not update authStatus to inactive for destination: %v", authStatusToggleRes.Message) } - authStatusToggleStats.statName = GetOAuthActionStatName("request") - authStatusToggleStats.errorMessage = msg - authStatusToggleStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": msg, + "isCallToCpApi": "true", + }) return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error() } h.Logger.Debugn("[request] :: (Write) auth status inactive Response received", logger.NewIntField("StatusCode", int64(statusCode)), logger.NewStringField("Response", respBody)) - authStatusToggleStats.statName = GetOAuthActionStatName("request") - authStatusToggleStats.errorMessage = "" - authStatusToggleStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": "", + "isCallToCpApi": "true", + }) return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error() } @@ -342,14 +348,15 @@ func (h *OAuthHandler) GetRefreshTokenErrResp(response string, accountSecret *Ac // This method hits the Control Plane to get the account information // As well update the account information into the destAuthInfoMap(which acts as an in-memory cache) func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams, refTokenBody RefreshTokenBodyParams, - authStats *OAuthStats, logTypeName string, + statsHandler OAuthStatsHandler, logTypeName string, ) (int, *AuthResponse, error) { + actionType := strings.Join(strings.Fields(strings.ToLower(logTypeName)), "_") refreshUrl := fmt.Sprintf("%s/destination/workspaces/%s/accounts/%s/token", h.ConfigBEURL, refTokenParams.WorkspaceID, refTokenParams.AccountID) res, err := json.Marshal(refTokenBody) if err != nil { - authStats.statName = GetOAuthActionStatName("request") - authStats.errorMessage = "error in marshalling refresh token body" - authStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": "error in marshalling refresh token body", + }) return http.StatusInternalServerError, nil, err } refreshCpReq := &controlplane.Request{ @@ -358,20 +365,21 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams ContentType: "application/json; charset=utf-8", Body: string(res), DestName: refTokenParams.DestDefName, - RequestType: authStats.action, + RequestType: actionType, BasicAuthUser: h.TokenProvider.Identity(), } var accountSecret AccountSecret // Stat for counting number of Refresh Token endpoint calls - authStats.statName = GetOAuthActionStatName("request_sent") - authStats.isCallToCpApi = true - authStats.errorMessage = "" - authStats.SendCountStat() + statsHandler.Increment("request_sent", stats.Tags{ + "isCallToCpApi": "true", + "errorMessage": "", + }) cpiCallStartTime := time.Now() statusCode, response := h.CpConn.CpApiCall(refreshCpReq) - authStats.statName = GetOAuthActionStatName("request_latency") - authStats.SendTimerStats(cpiCallStartTime) + statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{ + "isCallToCpApi": "true", + }) log := h.Logger.Withn(logger.NewIntField("StatusCode", int64(statusCode)), logger.NewIntField("WorkerId", int64(refTokenParams.WorkerID)), @@ -380,9 +388,10 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams // Empty Refresh token response if !routerutils.IsNotEmptyString(response) { - authStats.statName = GetOAuthActionStatName("request") - authStats.errorMessage = "Empty secret" - authStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": "Empty secret", + "isCallToCpApi": "true", + }) // Setting empty accessToken value into in-memory auth info map(cache) h.Logger.Debugn("Empty response from Control-Plane", logger.NewStringField("Response", response), @@ -398,9 +407,10 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams Err: errType, ErrorMessage: refErrMsg, } - authStats.statName = GetOAuthActionStatName("request") - authStats.errorMessage = errType - authStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": errType, + "isCallToCpApi": "true", + }) if authResponse.Err == common.RefTokenInvalidGrant { // Should abort the event as refresh is not going to work // until we have new refresh token for the account @@ -408,9 +418,10 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams } return http.StatusInternalServerError, authResponse, fmt.Errorf("error occurred while fetching/refreshing account info from CP: %s", refErrMsg) } - authStats.statName = GetOAuthActionStatName("request") - authStats.errorMessage = "" - authStats.SendCountStat() + statsHandler.Increment("request", stats.Tags{ + "errorMessage": "", + "isCallToCpApi": "true", + }) log.Debugn("[request] :: (Write) Account Secret received") // Store expirationDate information accountSecret.ExpirationDate = gjson.Get(response, "secret.expirationDate").String() diff --git a/services/oauth/v2/stats.go b/services/oauth/v2/stats.go index b6db502237..75fe46d8cf 100644 --- a/services/oauth/v2/stats.go +++ b/services/oauth/v2/stats.go @@ -2,12 +2,17 @@ package v2 import ( "strconv" + "strings" "time" + "github.com/samber/lo" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/services/oauth/v2/common" ) +const OAUTH_V2_STAT_PREFIX = "oauth_action" + type OAuthStats struct { stats stats.Stats id string // destinationId -> for action == auth_status_inactive, accountId -> for action == refresh_token/fetch_token @@ -18,40 +23,45 @@ type OAuthStats struct { isCallToCpApi bool // is a call being made to control-plane APIs authErrCategory string // for action=refresh_token -> REFRESH_TOKEN, for action=fetch_token -> "", for action=auth_status_inactive -> auth_status_inactive destDefName string - isTokenFetch bool // This stats field is used to identify if a request to get token is arising from processor flowType common.RudderFlow // delivery, delete action string // refresh_token, fetch_token, auth_status_inactive } -func (s *OAuthStats) SendTimerStats(startTime time.Time) { - statsTags := stats.Tags{ - "id": s.id, - "workspaceId": s.workspaceID, - "rudderCategory": s.rudderCategory, - "isCallToCpApi": strconv.FormatBool(s.isCallToCpApi), - "authErrCategory": s.authErrCategory, - "destType": s.destDefName, - "flowType": string(s.flowType), - "action": s.action, +type OAuthStatsHandler struct { + stats stats.Stats + defaultTags stats.Tags +} + +func GetDefaultTagsFromOAuthStats(oauthStats *OAuthStats) stats.Tags { + return stats.Tags{ + "id": oauthStats.id, + "workspaceId": oauthStats.workspaceID, + "rudderCategory": "destination", + "isCallToCpApi": strconv.FormatBool(oauthStats.isCallToCpApi), + "authErrCategory": oauthStats.authErrCategory, + "destType": oauthStats.destDefName, + "flowType": string(oauthStats.flowType), + "action": oauthStats.action, "oauthVersion": "v2", } - s.stats.NewTaggedStat(s.statName, stats.TimerType, statsTags).SendTiming(time.Since(startTime)) } -// SendCountStat Send count type stats related to OAuth(Destination) -func (s *OAuthStats) SendCountStat() { - statsTags := stats.Tags{ - "oauthVersion": "v2", - "id": s.id, - "workspaceId": s.workspaceID, - "rudderCategory": s.rudderCategory, - "errorMessage": s.errorMessage, - "isCallToCpApi": strconv.FormatBool(s.isCallToCpApi), - "authErrCategory": s.authErrCategory, - "destType": s.destDefName, - "isTokenFetch": strconv.FormatBool(s.isTokenFetch), - "flowType": string(s.flowType), - "action": s.action, +func NewStatsHandlerFromOAuthStats(oauthStats *OAuthStats) OAuthStatsHandler { + defaultTags := GetDefaultTagsFromOAuthStats(oauthStats) + return OAuthStatsHandler{ + stats: oauthStats.stats, + defaultTags: defaultTags, } - s.stats.NewTaggedStat(s.statName, stats.CountType, statsTags).Increment() +} + +func (m *OAuthStatsHandler) Increment(statSuffix string, tags stats.Tags) { + statName := strings.Join([]string{OAUTH_V2_STAT_PREFIX, statSuffix}, "_") + allTags := lo.Assign(m.defaultTags, tags) + m.stats.NewTaggedStat(statName, stats.CountType, allTags).Increment() +} + +func (m *OAuthStatsHandler) SendTiming(startTime time.Time, statSuffix string, tags stats.Tags) { + statName := strings.Join([]string{OAUTH_V2_STAT_PREFIX, statSuffix}, "_") + allTags := lo.Assign(m.defaultTags, tags) + m.stats.NewTaggedStat(statName, stats.TimerType, allTags).SendTiming(time.Since(startTime)) } diff --git a/services/oauth/v2/utils.go b/services/oauth/v2/utils.go index a8754673d1..67cab847fb 100644 --- a/services/oauth/v2/utils.go +++ b/services/oauth/v2/utils.go @@ -7,6 +7,7 @@ import ( "fmt" "time" + "github.com/rudderlabs/rudder-go-kit/stats" routerutils "github.com/rudderlabs/rudder-server/router/utils" "github.com/rudderlabs/rudder-server/services/oauth/v2/common" "github.com/rudderlabs/rudder-server/utils/misc" @@ -21,8 +22,8 @@ func GetOAuthActionStatName(stat string) string { return fmt.Sprintf("oauth_action_%v", stat) } -func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, stats *OAuthStats) bool { - if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, stats) { +func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, statsHandler OAuthStatsHandler) bool { + if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, &statsHandler) { return true } if !routerutils.IsNotEmptyString(string(oldSecret)) { @@ -31,12 +32,12 @@ func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiry return bytes.Equal(secret.Secret, oldSecret) } -func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, stats *OAuthStats) bool { +func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, statsHandler *OAuthStatsHandler) bool { date, err := time.Parse(misc.RFC3339Milli, expirationDate) if err != nil { - stats.errorMessage = "parsing failed" - stats.statName = GetOAuthActionStatName("proactive_token_refresh") - stats.SendCountStat() + statsHandler.Increment("proactive_token_refresh", stats.Tags{ + "errorMessage": "parsing failed", + }) return false } return date.Before(time.Now().Add(expirationTimeDiff))