Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement backoff for snowpipe streaming authorization errors #5399

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

shekhar-rudder
Copy link
Member

@shekhar-rudder shekhar-rudder commented Dec 27, 2024

Description

This PR introduces a backoff mechanism when creating channels in the Snowpipe streaming destination. If an authorization error is encountered at the schema, table, or column level, the backoff ensures that we avoid repeatedly attempting to connect to Snowflake unnecessarily.

  • Added a new struct authzBackoff in manager to store backoff related state
  • Refactoring: Added managerCreator as a field in the manager for making the code testable.
  • Until now, any error encountered while creating the channel for the discards table resulted in the job being marked as aborted. In this PR, the behavior has been updated to mark the job as failed, allowing it to be retried.
  • Backoff is applied whenever an authorization error is encountered.
  • Backoff is being cleared whenver channels for all the tables are being created without any authz error

Security

  • The code changed/added as part of this pull request won't create any security issues with how the software is being used.

Copy link

codecov bot commented Dec 27, 2024

Codecov Report

Attention: Patch coverage is 89.02439% with 9 lines in your changes missing coverage. Please review.

Project coverage is 74.80%. Comparing base (8a186e5) to head (b5a1417).
Report is 1 commits behind head on master.

Files with missing lines Patch % Lines
...tionmanager/snowpipestreaming/snowpipestreaming.go 86.36% 4 Missing and 2 partials ⚠️
...yncdestinationmanager/snowpipestreaming/channel.go 80.00% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5399      +/-   ##
==========================================
- Coverage   74.82%   74.80%   -0.03%     
==========================================
  Files         438      438              
  Lines       61331    61404      +73     
==========================================
+ Hits        45893    45935      +42     
- Misses      12895    12931      +36     
+ Partials     2543     2538       -5     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Comment on lines 174 to 175
backoff.WithMaxElapsedTime(0),
backoff.WithMaxInterval(0),
Copy link

@RanjeetMishra RanjeetMishra Jan 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Should we set MaxInterval as say 1 hr? This would mean once issue is resolved around snowflake permissions it will resume working by next 1 hour? Also from correctness perspective, overflow in backoff package is handled as maxInterval.
  2. Are we using cenkalti/backoff/v4 elsewhere as well. I see in v5 pattern of initializing has been changed.

CC: @achettyiitr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we using cenkalti/backoff/v4 elsewhere as well. I see in v5 pattern of initializing has been changed.

In rudder-server, we are still using v4. We haven't upgraded to v5 yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Made MaxInterval as 1hr

Comment on lines 223 to 225
if sfConnectionErr.code == errAuthz {
m.authzBackoff.set()
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this mean we end up invoking .NextBackoff as many times as size of uploadInfos?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code to set backoff only once

Comment on lines +15 to +37
type errCode int

const (
errBackoff errCode = iota
errAuthz
)

type snowflakeConnectionErr struct {
code errCode
err error
}

func newSnowflakeConnectionErr(code errCode, err error) *snowflakeConnectionErr {
return &snowflakeConnectionErr{
code: code,
err: err,
}
}

func (sae *snowflakeConnectionErr) Error() string {
return sae.err.Error()
}

Copy link
Member

@achettyiitr achettyiitr Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably simplify this by defining a sentinel error since the requirement is just to classify the errors. There is no requirement to expose these error codes to other clients. wdyt?

// Define sentinel error
var errAuth = errors.New("authorization error")

// when sending the error you can do something like this
if err := snowflakeManager.CreateSchema(ctx); err != nil {
  return nil, fmt.Errorf("creating schema: %w, %w", errAuth, err)
}

// For classification
if err != nil {
  if errors.Is(err, errAuth) {
    m.authzBackoff.set()
    return m.failedJobs(asyncDest, err.Error())
  } else {
     return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
  }
}

@@ -25,6 +28,7 @@ type (
statsFactory stats.Stats
destination *backendconfig.DestinationT
requestDoer requestDoer
managerCreator func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, mLogger logger.Logger, stats stats.Stats) (manager.Manager, error)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
managerCreator func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, mLogger logger.Logger, stats stats.Stats) (manager.Manager, error)
managerCreator func(ctx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, statsFactory stats.Stats) (manager.Manager, error)

Comment on lines +113 to +123
m.managerCreator = func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, stats stats.Stats) (manager.Manager, error) {
sf, err := manager.New(whutils.SnowpipeStreaming, conf, logger, stats)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}
err = sf.Setup(mCtx, modelWarehouse, whutils.NewNoOpUploader())
if err != nil {
return nil, fmt.Errorf("setting up snowflake manager: %w", err)
}
return sf, nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
m.managerCreator = func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, stats stats.Stats) (manager.Manager, error) {
sf, err := manager.New(whutils.SnowpipeStreaming, conf, logger, stats)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}
err = sf.Setup(mCtx, modelWarehouse, whutils.NewNoOpUploader())
if err != nil {
return nil, fmt.Errorf("setting up snowflake manager: %w", err)
}
return sf, nil
}
m.managerCreator = func(ctx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, statsFactory stats.Stats) (manager.Manager, error) {
sf, err := manager.New(whutils.SnowpipeStreaming, conf, logger, statsFactory)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}
err = sf.Setup(ctx, modelWarehouse, whutils.NewNoOpUploader())
if err != nil {
return nil, fmt.Errorf("setting up snowflake manager: %w", err)
}
return sf, nil
}

Comment on lines +41 to +45
type systemClock struct{}

func (t systemClock) Now() time.Time {
return timeutil.Now()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we override this on Manager struct instead of defining a new struct here.

func (m *Manager) Now() time.Time {
	return m.now()
}

// when using, u can do something like this.
newAuthzBackoff(conf.GetDuration("SnowpipeStreaming.backoffDuration", 1, time.Second), m)

Comment on lines +167 to +197
func newAuthzBackoff(initialInterval time.Duration, clock backoff.Clock) *authzBackoff {
return &authzBackoff{
options: []backoff.ExponentialBackOffOpts{
backoff.WithInitialInterval(initialInterval),
backoff.WithMultiplier(2),
backoff.WithClockProvider(clock),
backoff.WithRandomizationFactor(0),
backoff.WithMaxElapsedTime(0),
backoff.WithMaxInterval(time.Hour),
},
}
}

func (abe *authzBackoff) set() {
if abe.backoff == nil {
abe.backoff = backoff.NewExponentialBackOff(abe.options...)
}
// nextBackoff can't be a derived field since everytime NextBackOff is called, internal state of backoff is updated.
abe.nextBackoff = abe.backoff.NextBackOff()
}

func (abe *authzBackoff) reset() {
abe.backoff = nil
}

func (abe *authzBackoff) isInBackoff() bool {
if abe.backoff == nil {
return false
}
return abe.backoff.GetElapsedTime() < abe.nextBackoff
}
Copy link
Member

@achettyiitr achettyiitr Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably simplify this logic since we are required to only store nextBackoff time.Time with the Manager struct and use it to decide whether we are in Backoff.

Created a short draft PR since it's difficult to explain within here.

Comment on lines +62 to +68
func (m *mockManager) CreateSchema(ctx context.Context) (err error) {
return m.createSchemaErr
}

func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap whutils.ModelTableSchema) (err error) {
return nil
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func (m *mockManager) CreateSchema(ctx context.Context) (err error) {
return m.createSchemaErr
}
func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap whutils.ModelTableSchema) (err error) {
return nil
}
func (m *mockManager) CreateSchema(context.Context) error {
return m.createSchemaErr
}
func (m *mockManager) CreateTable(context.Context, string, whutils.ModelTableSchema) error {
return nil
}

Comment on lines +170 to +175
backoff.WithInitialInterval(initialInterval),
backoff.WithMultiplier(2),
backoff.WithClockProvider(clock),
backoff.WithRandomizationFactor(0),
backoff.WithMaxElapsedTime(0),
backoff.WithMaxInterval(time.Hour),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make these configurable instead of hard coding it?

Comment on lines +393 to +396
sm.Upload(asyncDestStruct)
// client created again since backoff duration has been exceeded
require.Equal(t, 2, managerCreatorCallCount)
require.Equal(t, false, sm.authzBackoff.isInBackoff())
Copy link
Member

@achettyiitr achettyiitr Jan 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since here managerCreator will return an error, sm.authzBackoff.isInBackoff() should return True right because Post upload, backoff will be set.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants