Skip to content

Commit

Permalink
Merge branch 'master' into chore.jobsdbPayloadColumnType
Browse files Browse the repository at this point in the history
  • Loading branch information
Sidddddarth committed Dec 28, 2024
2 parents ece9675 + 18f4bdf commit b55615a
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 86 deletions.
6 changes: 0 additions & 6 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,6 @@ const (
pgErrorCodeTableReadonly = "RS001"
)

var payloadTypes = map[payloadColumnType]string{
JSONB: "jsonb",
TEXT: "text",
BYTEA: "bytea",
}

type payloadColumnType string

const (
Expand Down
105 changes: 58 additions & 47 deletions services/oauth/v2/oauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"github.com/tidwall/gjson"
Expand Down Expand Up @@ -141,11 +142,11 @@ func (h *OAuthHandler) FetchToken(fetchTokenParams *RefreshTokenParams) (int, *A
authErrCategory: "",
errorMessage: "",
destDefName: fetchTokenParams.DestDefName,
isTokenFetch: true,
flowType: h.RudderFlowType,
action: "fetch_token",
}
return h.GetTokenInfo(fetchTokenParams, "Fetch token", authStats)
statshandler := NewStatsHandlerFromOAuthStats(authStats)
return h.GetTokenInfo(fetchTokenParams, "Fetch token", statshandler)
}

/*
Expand Down Expand Up @@ -176,10 +177,11 @@ func (h *OAuthHandler) RefreshToken(refTokenParams *RefreshTokenParams) (int, *A
action: "refresh_token",
stats: h.stats,
}
return h.GetTokenInfo(refTokenParams, "Refresh token", authStats)
statsHandler := NewStatsHandlerFromOAuthStats(authStats)
return h.GetTokenInfo(refTokenParams, "Refresh token", statsHandler)
}

func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, authStats *OAuthStats) (int, *AuthResponse, error) {
func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeName string, statsHandler OAuthStatsHandler) (int, *AuthResponse, error) {
log := h.Logger.Withn(
logger.NewStringField("Call Type", logTypeName),
logger.NewStringField("AccountId", refTokenParams.AccountID),
Expand All @@ -189,13 +191,13 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN
)
log.Debugn("[request] :: Get Token Info request received")
startTime := time.Now()
defer func() {
authStats.statName = GetOAuthActionStatName("total_latency")
authStats.isCallToCpApi = false
authStats.SendTimerStats(startTime)
}()
h.CacheMutex.Lock(refTokenParams.AccountID)
defer h.CacheMutex.Unlock(refTokenParams.AccountID)
defer func() {
statsHandler.SendTiming(startTime, "total_latency", stats.Tags{
"isCallToCpApi": "false",
})
}()
refTokenBody := RefreshTokenBodyParams{}
storedCache, ok := h.Cache.Load(refTokenParams.AccountID)
if ok {
Expand All @@ -205,7 +207,7 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN
return http.StatusInternalServerError, nil, errors.New("failed to type assert the stored cache")
}
// TODO: verify if the storedCache is nil at this point
if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, authStats) {
if !checkIfTokenExpired(cachedSecret.Account, refTokenParams.Secret, h.ExpirationTimeDiff, statsHandler) {
return http.StatusOK, cachedSecret, nil
}
// Refresh token preparation
Expand All @@ -214,7 +216,7 @@ func (h *OAuthHandler) GetTokenInfo(refTokenParams *RefreshTokenParams, logTypeN
ExpiredSecret: refTokenParams.Secret,
}
}
statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, authStats, logTypeName)
statusCode, refSecret, refErr := h.fetchAccountInfoFromCp(refTokenParams, refTokenBody, statsHandler, logTypeName)
// handling of refresh token response
if statusCode == http.StatusOK {
// fetching/refreshing through control plane was successful
Expand All @@ -241,11 +243,7 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC
action: action,
stats: h.stats,
}
defer func() {
authStatusToggleStats.statName = GetOAuthActionStatName("total_latency")
authStatusToggleStats.isCallToCpApi = false
authStatusToggleStats.SendTimerStats(authErrHandlerTimeStart)
}()
statsHandler := NewStatsHandlerFromOAuthStats(authStatusToggleStats)
h.CacheMutex.Lock(params.RudderAccountID)
isAuthStatusUpdateActive, isAuthStatusUpdateReqPresent := h.AuthStatusUpdateActiveMap[destinationId]
if isAuthStatusUpdateReqPresent && isAuthStatusUpdateActive {
Expand All @@ -266,6 +264,11 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC
h.Cache.Delete(params.RudderAccountID)
h.CacheMutex.Unlock(params.RudderAccountID)
}()
defer func() {
statsHandler.SendTiming(authErrHandlerTimeStart, "total_latency", stats.Tags{
"isCallToCpApi": "false",
})
}()

authStatusToggleUrl := fmt.Sprintf("%s/workspaces/%s/destinations/%s/authStatus/toggle", h.ConfigBEURL, params.WorkspaceID, destinationId)

Expand All @@ -278,14 +281,15 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC
RequestType: action,
BasicAuthUser: h.Identity(),
}
authStatusToggleStats.statName = GetOAuthActionStatName("request_sent")
authStatusToggleStats.isCallToCpApi = true
authStatusToggleStats.SendCountStat()
statsHandler.Increment("request_sent", stats.Tags{
"isCallToCpApi": "true",
})

cpiCallStartTime := time.Now()
statusCode, respBody = h.CpConn.CpApiCall(authStatusInactiveCpReq)
authStatusToggleStats.statName = GetOAuthActionStatName("request_latency")
authStatusToggleStats.SendTimerStats(cpiCallStartTime)
statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{
"isCallToCpApi": "true",
})
h.Logger.Debugn("[request] :: Response from CP for auth status inactive req",
logger.NewIntField("StatusCode", int64(statusCode)),
logger.NewStringField("Response", respBody))
Expand All @@ -299,18 +303,20 @@ func (h *OAuthHandler) AuthStatusToggle(params *AuthStatusToggleParams) (statusC
} else {
msg = fmt.Sprintf("Could not update authStatus to inactive for destination: %v", authStatusToggleRes.Message)
}
authStatusToggleStats.statName = GetOAuthActionStatName("request")
authStatusToggleStats.errorMessage = msg
authStatusToggleStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": msg,
"isCallToCpApi": "true",
})
return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error()
}
h.Logger.Debugn("[request] :: (Write) auth status inactive Response received",
logger.NewIntField("StatusCode", int64(statusCode)),
logger.NewStringField("Response", respBody))
authStatusToggleStats.statName = GetOAuthActionStatName("request")
authStatusToggleStats.errorMessage = ""
authStatusToggleStats.SendCountStat()

statsHandler.Increment("request", stats.Tags{
"errorMessage": "",
"isCallToCpApi": "true",
})
return http.StatusBadRequest, ErrPermissionOrTokenRevoked.Error()
}

Expand Down Expand Up @@ -342,14 +348,15 @@ func (h *OAuthHandler) GetRefreshTokenErrResp(response string, accountSecret *Ac
// This method hits the Control Plane to get the account information
// As well update the account information into the destAuthInfoMap(which acts as an in-memory cache)
func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams, refTokenBody RefreshTokenBodyParams,
authStats *OAuthStats, logTypeName string,
statsHandler OAuthStatsHandler, logTypeName string,
) (int, *AuthResponse, error) {
actionType := strings.Join(strings.Fields(strings.ToLower(logTypeName)), "_")
refreshUrl := fmt.Sprintf("%s/destination/workspaces/%s/accounts/%s/token", h.ConfigBEURL, refTokenParams.WorkspaceID, refTokenParams.AccountID)
res, err := json.Marshal(refTokenBody)
if err != nil {
authStats.statName = GetOAuthActionStatName("request")
authStats.errorMessage = "error in marshalling refresh token body"
authStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": "error in marshalling refresh token body",
})
return http.StatusInternalServerError, nil, err
}
refreshCpReq := &controlplane.Request{
Expand All @@ -358,20 +365,21 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams
ContentType: "application/json; charset=utf-8",
Body: string(res),
DestName: refTokenParams.DestDefName,
RequestType: authStats.action,
RequestType: actionType,
BasicAuthUser: h.TokenProvider.Identity(),
}
var accountSecret AccountSecret
// Stat for counting number of Refresh Token endpoint calls
authStats.statName = GetOAuthActionStatName("request_sent")
authStats.isCallToCpApi = true
authStats.errorMessage = ""
authStats.SendCountStat()
statsHandler.Increment("request_sent", stats.Tags{
"isCallToCpApi": "true",
"errorMessage": "",
})

cpiCallStartTime := time.Now()
statusCode, response := h.CpConn.CpApiCall(refreshCpReq)
authStats.statName = GetOAuthActionStatName("request_latency")
authStats.SendTimerStats(cpiCallStartTime)
statsHandler.SendTiming(cpiCallStartTime, "request_latency", stats.Tags{
"isCallToCpApi": "true",
})

log := h.Logger.Withn(logger.NewIntField("StatusCode", int64(statusCode)),
logger.NewIntField("WorkerId", int64(refTokenParams.WorkerID)),
Expand All @@ -380,9 +388,10 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams

// Empty Refresh token response
if !routerutils.IsNotEmptyString(response) {
authStats.statName = GetOAuthActionStatName("request")
authStats.errorMessage = "Empty secret"
authStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": "Empty secret",
"isCallToCpApi": "true",
})
// Setting empty accessToken value into in-memory auth info map(cache)
h.Logger.Debugn("Empty response from Control-Plane",
logger.NewStringField("Response", response),
Expand All @@ -398,19 +407,21 @@ func (h *OAuthHandler) fetchAccountInfoFromCp(refTokenParams *RefreshTokenParams
Err: errType,
ErrorMessage: refErrMsg,
}
authStats.statName = GetOAuthActionStatName("request")
authStats.errorMessage = errType
authStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": errType,
"isCallToCpApi": "true",
})
if authResponse.Err == common.RefTokenInvalidGrant {
// Should abort the event as refresh is not going to work
// until we have new refresh token for the account
return http.StatusBadRequest, authResponse, fmt.Errorf("invalid grant")
}
return http.StatusInternalServerError, authResponse, fmt.Errorf("error occurred while fetching/refreshing account info from CP: %s", refErrMsg)
}
authStats.statName = GetOAuthActionStatName("request")
authStats.errorMessage = ""
authStats.SendCountStat()
statsHandler.Increment("request", stats.Tags{
"errorMessage": "",
"isCallToCpApi": "true",
})
log.Debugn("[request] :: (Write) Account Secret received")
// Store expirationDate information
accountSecret.ExpirationDate = gjson.Get(response, "secret.expirationDate").String()
Expand Down
64 changes: 37 additions & 27 deletions services/oauth/v2/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,17 @@ package v2

import (
"strconv"
"strings"
"time"

"github.com/samber/lo"

"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-server/services/oauth/v2/common"
)

const OAUTH_V2_STAT_PREFIX = "oauth_action"

type OAuthStats struct {
stats stats.Stats
id string // destinationId -> for action == auth_status_inactive, accountId -> for action == refresh_token/fetch_token
Expand All @@ -18,40 +23,45 @@ type OAuthStats struct {
isCallToCpApi bool // is a call being made to control-plane APIs
authErrCategory string // for action=refresh_token -> REFRESH_TOKEN, for action=fetch_token -> "", for action=auth_status_inactive -> auth_status_inactive
destDefName string
isTokenFetch bool // This stats field is used to identify if a request to get token is arising from processor
flowType common.RudderFlow // delivery, delete
action string // refresh_token, fetch_token, auth_status_inactive
}

func (s *OAuthStats) SendTimerStats(startTime time.Time) {
statsTags := stats.Tags{
"id": s.id,
"workspaceId": s.workspaceID,
"rudderCategory": s.rudderCategory,
"isCallToCpApi": strconv.FormatBool(s.isCallToCpApi),
"authErrCategory": s.authErrCategory,
"destType": s.destDefName,
"flowType": string(s.flowType),
"action": s.action,
type OAuthStatsHandler struct {
stats stats.Stats
defaultTags stats.Tags
}

func GetDefaultTagsFromOAuthStats(oauthStats *OAuthStats) stats.Tags {
return stats.Tags{
"id": oauthStats.id,
"workspaceId": oauthStats.workspaceID,
"rudderCategory": "destination",
"isCallToCpApi": strconv.FormatBool(oauthStats.isCallToCpApi),
"authErrCategory": oauthStats.authErrCategory,
"destType": oauthStats.destDefName,
"flowType": string(oauthStats.flowType),
"action": oauthStats.action,
"oauthVersion": "v2",
}
s.stats.NewTaggedStat(s.statName, stats.TimerType, statsTags).SendTiming(time.Since(startTime))
}

// SendCountStat Send count type stats related to OAuth(Destination)
func (s *OAuthStats) SendCountStat() {
statsTags := stats.Tags{
"oauthVersion": "v2",
"id": s.id,
"workspaceId": s.workspaceID,
"rudderCategory": s.rudderCategory,
"errorMessage": s.errorMessage,
"isCallToCpApi": strconv.FormatBool(s.isCallToCpApi),
"authErrCategory": s.authErrCategory,
"destType": s.destDefName,
"isTokenFetch": strconv.FormatBool(s.isTokenFetch),
"flowType": string(s.flowType),
"action": s.action,
func NewStatsHandlerFromOAuthStats(oauthStats *OAuthStats) OAuthStatsHandler {
defaultTags := GetDefaultTagsFromOAuthStats(oauthStats)
return OAuthStatsHandler{
stats: oauthStats.stats,
defaultTags: defaultTags,
}
s.stats.NewTaggedStat(s.statName, stats.CountType, statsTags).Increment()
}

func (m *OAuthStatsHandler) Increment(statSuffix string, tags stats.Tags) {
statName := strings.Join([]string{OAUTH_V2_STAT_PREFIX, statSuffix}, "_")
allTags := lo.Assign(m.defaultTags, tags)
m.stats.NewTaggedStat(statName, stats.CountType, allTags).Increment()
}

func (m *OAuthStatsHandler) SendTiming(startTime time.Time, statSuffix string, tags stats.Tags) {
statName := strings.Join([]string{OAUTH_V2_STAT_PREFIX, statSuffix}, "_")
allTags := lo.Assign(m.defaultTags, tags)
m.stats.NewTaggedStat(statName, stats.TimerType, allTags).SendTiming(time.Since(startTime))
}
13 changes: 7 additions & 6 deletions services/oauth/v2/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"time"

"github.com/rudderlabs/rudder-go-kit/stats"
routerutils "github.com/rudderlabs/rudder-server/router/utils"
"github.com/rudderlabs/rudder-server/services/oauth/v2/common"
"github.com/rudderlabs/rudder-server/utils/misc"
Expand All @@ -21,8 +22,8 @@ func GetOAuthActionStatName(stat string) string {
return fmt.Sprintf("oauth_action_%v", stat)
}

func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, stats *OAuthStats) bool {
if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, stats) {
func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiryTimeDiff time.Duration, statsHandler OAuthStatsHandler) bool {
if secret.ExpirationDate != "" && isTokenExpired(secret.ExpirationDate, expiryTimeDiff, &statsHandler) {
return true
}
if !routerutils.IsNotEmptyString(string(oldSecret)) {
Expand All @@ -31,12 +32,12 @@ func checkIfTokenExpired(secret AccountSecret, oldSecret json.RawMessage, expiry
return bytes.Equal(secret.Secret, oldSecret)
}

func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, stats *OAuthStats) bool {
func isTokenExpired(expirationDate string, expirationTimeDiff time.Duration, statsHandler *OAuthStatsHandler) bool {
date, err := time.Parse(misc.RFC3339Milli, expirationDate)
if err != nil {
stats.errorMessage = "parsing failed"
stats.statName = GetOAuthActionStatName("proactive_token_refresh")
stats.SendCountStat()
statsHandler.Increment("proactive_token_refresh", stats.Tags{
"errorMessage": "parsing failed",
})
return false
}
return date.Before(time.Now().Add(expirationTimeDiff))
Expand Down

0 comments on commit b55615a

Please sign in to comment.