Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement backoff for snowpipe streaming authorization errors #5399

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"errors"
"fmt"

"github.com/rudderlabs/rudder-go-kit/logger"
Expand All @@ -12,6 +13,11 @@
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
errAuthz = errors.New("snowpipe authorization error")
errBackoff = errors.New("snowpipe backoff error")
)

// initializeChannelWithSchema creates a new channel for the given table if it doesn't exist.
// If the channel already exists, it checks for new columns and adds them to the table.
// It returns the channel response after creating or recreating the channel.
Expand Down Expand Up @@ -66,7 +72,7 @@
snowflakeManager.Cleanup(ctx)
}()
if err = snowflakeManager.AddColumns(ctx, tableName, columns); err != nil {
return fmt.Errorf("adding column: %w", err)
return fmt.Errorf("adding column: %w, %w", errAuthz, err)

Check warning on line 75 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L75

Added line #L75 was not covered by tests
}
return nil
}
Expand Down Expand Up @@ -157,10 +163,10 @@
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateSchema(ctx); err != nil {
return nil, fmt.Errorf("creating schema: %w", err)
return nil, fmt.Errorf("creating schema: %w, %w", errAuthz, err)
}
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %w", err)
return nil, fmt.Errorf("creating table: %w, %w", errAuthz, err)

Check warning on line 169 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L169

Added line #L169 was not covered by tests
}
return m.api.CreateChannel(ctx, channelReq)
}
Expand All @@ -185,7 +191,7 @@
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %w", err)
return nil, fmt.Errorf("creating table: %w, %w", errAuthz, err)

Check warning on line 194 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L194

Added line #L194 was not covered by tests
}
return m.api.CreateChannel(ctx, channelReq)
}
Expand Down Expand Up @@ -225,6 +231,9 @@
}

func (m *Manager) createSnowflakeManager(ctx context.Context, namespace string) (manager.Manager, error) {
if m.isInBackoff() {
return nil, fmt.Errorf("skipping snowflake manager creation due to backoff: %w", errBackoff)
}
modelWarehouse := whutils.ModelWarehouse{
WorkspaceID: m.destination.WorkspaceID,
Destination: *m.destination,
Expand All @@ -234,13 +243,5 @@
}
modelWarehouse.Destination.Config["useKeyPairAuth"] = true // Since we are currently only supporting key pair auth

sf, err := manager.New(whutils.SnowpipeStreaming, m.appConfig, m.logger, m.statsFactory)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}
err = sf.Setup(ctx, modelWarehouse, whutils.NewNoOpUploader())
if err != nil {
return nil, fmt.Errorf("setting up snowflake manager: %w", err)
}
return sf, nil
return m.managerCreator(ctx, modelWarehouse, m.appConfig, m.logger, m.statsFactory)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"bufio"
"context"
stdjson "encoding/json"
"errors"
"fmt"
"net/http"
"os"
Expand All @@ -12,6 +13,7 @@
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/go-retryablehttp"
jsoniter "github.com/json-iterator/go"
"github.com/samber/lo"
Expand All @@ -31,20 +33,21 @@
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

func New(
conf *config.Config,
logger logger.Logger,
log logger.Logger,
statsFactory stats.Stats,
destination *backendconfig.DestinationT,
) *Manager {
m := &Manager{
appConfig: conf,
logger: logger.Child("snowpipestreaming").Withn(
logger: log.Child("snowpipestreaming").Withn(
obskit.WorkspaceID(destination.WorkspaceID),
obskit.DestinationID(destination.ID),
obskit.DestinationType(destination.DestinationDefinition.Name),
Expand All @@ -67,6 +70,9 @@
m.config.client.retryMax = conf.GetInt("SnowpipeStreaming.Client.retryMax", 5)
m.config.instanceID = conf.GetString("INSTANCE_ID", "1")
m.config.maxBufferCapacity = conf.GetReloadableInt64Var(512*bytesize.KB, bytesize.B, "SnowpipeStreaming.maxBufferCapacity")
m.config.backoff.initialInterval = conf.GetReloadableDurationVar(1, time.Second, "SnowpipeStreaming.backoffInitialIntervalInSeconds")
m.config.backoff.multiplier = conf.GetReloadableFloat64Var(2.0, "SnowpipeStreaming.backoffMultiplier")
m.config.backoff.maxInterval = conf.GetReloadableDurationVar(1, time.Hour, "SnowpipeStreaming.backoffMaxIntervalInHours")

tags := stats.Tags{
"module": "batch_router",
Expand Down Expand Up @@ -100,6 +106,17 @@
snowpipeapi.New(m.appConfig, m.statsFactory, m.config.client.url, m.requestDoer),
destination,
)
m.managerCreator = func(ctx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, statsFactory stats.Stats) (manager.Manager, error) {
sf, err := manager.New(whutils.SnowpipeStreaming, conf, logger, statsFactory)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}

Check warning on line 113 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go#L112-L113

Added lines #L112 - L113 were not covered by tests
err = sf.Setup(ctx, modelWarehouse, whutils.NewNoOpUploader())
if err != nil {
return nil, fmt.Errorf("setting up snowflake manager: %w", err)
}

Check warning on line 117 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go#L116-L117

Added lines #L116 - L117 were not covered by tests
return sf, nil
}
return m
}

Expand All @@ -121,6 +138,10 @@
return client
}

func (m *Manager) Now() time.Time {
return m.now()
}

func (m *Manager) Transform(job *jobsdb.JobT) (string, error) {
return common.GetMarshalledData(string(job.EventPayload), job.JobID)
}
Expand Down Expand Up @@ -152,6 +173,12 @@

discardsChannel, err := m.initializeChannelWithSchema(ctx, asyncDest.Destination.ID, &destConf, discardsTable(), discardsSchema())
if err != nil {
if errors.Is(err, errAuthz) || errors.Is(err, errBackoff) {
if errors.Is(err, errAuthz) {
m.setBackOff()
}
return m.failedJobs(asyncDest, err.Error())
}
Comment on lines +176 to +181
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably avoid doing errors.Is twice.

Suggested change
if errors.Is(err, errAuthz) || errors.Is(err, errBackoff) {
if errors.Is(err, errAuthz) {
m.setBackOff()
}
return m.failedJobs(asyncDest, err.Error())
}
switch {
case errors.Is(err, errAuthz):
m.setBackOff()
return m.failedJobs(asyncDest, err.Error())
case errors.Is(err, errBackoff):
return m.failedJobs(asyncDest, err.Error())
default:
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}

return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}
m.logger.Infon("Prepared discards channel")
Expand Down Expand Up @@ -184,9 +211,17 @@
importInfos []*importInfo
discardImportInfo *importInfo
)
shouldResetBackoff := true // backoff should be reset if authz error is not encountered for any of the tables
isBackoffSet := false // should not be set again if already set
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? It's not being set anywhere?

for _, info := range uploadInfos {
imInfo, discardImInfo, err := m.sendEventsToSnowpipe(ctx, asyncDest.Destination.ID, &destConf, info)
if err != nil {
if errors.Is(err, errAuthz) || errors.Is(err, errBackoff) {
shouldResetBackoff = false
if errors.Is(err, errAuthz) && !isBackoffSet {
m.setBackOff()
}
}
m.logger.Warnn("Failed to send events to Snowpipe",
logger.NewStringField("table", info.tableName),
obskit.Error(err),
Expand All @@ -206,6 +241,9 @@
discardImportInfo.Offset = discardImInfo.Offset
}
}
if shouldResetBackoff {
m.resetBackoff()
}
if discardImportInfo != nil {
importInfos = append(importInfos, discardImportInfo)
}
Expand Down Expand Up @@ -245,7 +283,7 @@

events := make([]*event, 0, eventsCount)

formattedTS := m.now().Format(misc.RFC3339Milli)
formattedTS := m.Now().Format(misc.RFC3339Milli)
scanner := bufio.NewScanner(file)
scanner.Buffer(nil, int(m.config.maxBufferCapacity.Load()))

Expand Down Expand Up @@ -289,7 +327,7 @@
}
log.Infon("Prepared channel", logger.NewStringField("channelID", channelResponse.ChannelID))

formattedTS := m.now().Format(misc.RFC3339Milli)
formattedTS := m.Now().Format(misc.RFC3339Milli)
var discardInfos []discardInfo
for _, tableEvent := range info.events {
discardInfos = append(discardInfos, getDiscardedRecordsFromEvent(tableEvent, channelResponse.SnowpipeSchema, info.tableName, formattedTS)...)
Expand Down Expand Up @@ -362,6 +400,16 @@
}
}

func (m *Manager) failedJobs(asyncDest *common.AsyncDestinationStruct, failedReason string) common.AsyncUploadOutput {
m.stats.jobs.failed.Count(len(asyncDest.ImportingJobIDs))
return common.AsyncUploadOutput{
FailedJobIDs: asyncDest.ImportingJobIDs,
FailedCount: len(asyncDest.ImportingJobIDs),
FailedReason: failedReason,
DestinationID: asyncDest.Destination.ID,
}
}

// Poll checks the status of multiple imports using the import ID from pollInput.
// For the once which have reached the terminal state (success or failure), it caches the import infos in polledImportInfoMap. Later if Poll is called again, it does not need to do the status check again.
// Once all the imports have reached the terminal state, if any imports have failed, it deletes the channels for those imports.
Expand Down Expand Up @@ -549,3 +597,34 @@
},
}
}

func (m *Manager) isInBackoff() bool {
if m.backoff.next.IsZero() {
return false
}
return m.Now().Before(m.backoff.next)
}

func (m *Manager) resetBackoff() {
m.backoff.next = time.Time{}
m.backoff.attempts = 0
}

func (m *Manager) setBackOff() {
b := backoff.NewExponentialBackOff(
backoff.WithInitialInterval(m.config.backoff.initialInterval.Load()),
backoff.WithMultiplier(m.config.backoff.multiplier.Load()),
backoff.WithClockProvider(m),
backoff.WithRandomizationFactor(0),
backoff.WithMaxElapsedTime(0),
backoff.WithMaxInterval(m.config.backoff.maxInterval.Load()),
)
b.Reset()
m.backoff.attempts++

var d time.Duration
for index := int64(0); index < int64(m.backoff.attempts); index++ {
d = b.NextBackOff()
}
m.backoff.next = m.Now().Add(d)
}
Loading
Loading