-
Notifications
You must be signed in to change notification settings - Fork 319
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement backoff for snowpipe streaming authorization errors #5399
base: master
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #5399 +/- ##
==========================================
- Coverage 74.82% 74.80% -0.03%
==========================================
Files 438 438
Lines 61331 61404 +73
==========================================
+ Hits 45893 45935 +42
- Misses 12895 12931 +36
+ Partials 2543 2538 -5 ☔ View full report in Codecov by Sentry. |
9c22207
to
476cdb4
Compare
476cdb4
to
5644298
Compare
5644298
to
141b31f
Compare
router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming_test.go
Outdated
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/snowpipestreaming/types.go
Outdated
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go
Outdated
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go
Outdated
Show resolved
Hide resolved
router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go
Outdated
Show resolved
Hide resolved
aa4f042
to
9779ca7
Compare
9779ca7
to
e268f73
Compare
backoff.WithMaxElapsedTime(0), | ||
backoff.WithMaxInterval(0), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Should we set MaxInterval as say 1 hr? This would mean once issue is resolved around snowflake permissions it will resume working by next 1 hour? Also from correctness perspective, overflow in backoff package is handled as maxInterval.
- Are we using
cenkalti/backoff/v4
elsewhere as well. I see in v5 pattern of initializing has been changed.
CC: @achettyiitr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we using cenkalti/backoff/v4 elsewhere as well. I see in v5 pattern of initializing has been changed.
In rudder-server, we are still using v4. We haven't upgraded to v5 yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Made MaxInterval as 1hr
if sfConnectionErr.code == errAuthz { | ||
m.authzBackoff.set() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would this mean we end up invoking .NextBackoff as many times as size of uploadInfos?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the code to set backoff only once
type errCode int | ||
|
||
const ( | ||
errBackoff errCode = iota | ||
errAuthz | ||
) | ||
|
||
type snowflakeConnectionErr struct { | ||
code errCode | ||
err error | ||
} | ||
|
||
func newSnowflakeConnectionErr(code errCode, err error) *snowflakeConnectionErr { | ||
return &snowflakeConnectionErr{ | ||
code: code, | ||
err: err, | ||
} | ||
} | ||
|
||
func (sae *snowflakeConnectionErr) Error() string { | ||
return sae.err.Error() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably simplify this by defining a sentinel error since the requirement is just to classify the errors. There is no requirement to expose these error codes to other clients. wdyt?
// Define sentinel error
var errAuth = errors.New("authorization error")
// when sending the error you can do something like this
if err := snowflakeManager.CreateSchema(ctx); err != nil {
return nil, fmt.Errorf("creating schema: %w, %w", errAuth, err)
}
// For classification
if err != nil {
if errors.Is(err, errAuth) {
m.authzBackoff.set()
return m.failedJobs(asyncDest, err.Error())
} else {
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}
}
@@ -25,6 +28,7 @@ type ( | |||
statsFactory stats.Stats | |||
destination *backendconfig.DestinationT | |||
requestDoer requestDoer | |||
managerCreator func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, mLogger logger.Logger, stats stats.Stats) (manager.Manager, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
managerCreator func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, mLogger logger.Logger, stats stats.Stats) (manager.Manager, error) | |
managerCreator func(ctx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, statsFactory stats.Stats) (manager.Manager, error) |
m.managerCreator = func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, stats stats.Stats) (manager.Manager, error) { | ||
sf, err := manager.New(whutils.SnowpipeStreaming, conf, logger, stats) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating snowflake manager: %w", err) | ||
} | ||
err = sf.Setup(mCtx, modelWarehouse, whutils.NewNoOpUploader()) | ||
if err != nil { | ||
return nil, fmt.Errorf("setting up snowflake manager: %w", err) | ||
} | ||
return sf, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
m.managerCreator = func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, stats stats.Stats) (manager.Manager, error) { | |
sf, err := manager.New(whutils.SnowpipeStreaming, conf, logger, stats) | |
if err != nil { | |
return nil, fmt.Errorf("creating snowflake manager: %w", err) | |
} | |
err = sf.Setup(mCtx, modelWarehouse, whutils.NewNoOpUploader()) | |
if err != nil { | |
return nil, fmt.Errorf("setting up snowflake manager: %w", err) | |
} | |
return sf, nil | |
} | |
m.managerCreator = func(ctx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, statsFactory stats.Stats) (manager.Manager, error) { | |
sf, err := manager.New(whutils.SnowpipeStreaming, conf, logger, statsFactory) | |
if err != nil { | |
return nil, fmt.Errorf("creating snowflake manager: %w", err) | |
} | |
err = sf.Setup(ctx, modelWarehouse, whutils.NewNoOpUploader()) | |
if err != nil { | |
return nil, fmt.Errorf("setting up snowflake manager: %w", err) | |
} | |
return sf, nil | |
} |
type systemClock struct{} | ||
|
||
func (t systemClock) Now() time.Time { | ||
return timeutil.Now() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we override this on Manager struct instead of defining a new struct here.
func (m *Manager) Now() time.Time {
return m.now()
}
// when using, u can do something like this.
newAuthzBackoff(conf.GetDuration("SnowpipeStreaming.backoffDuration", 1, time.Second), m)
func newAuthzBackoff(initialInterval time.Duration, clock backoff.Clock) *authzBackoff { | ||
return &authzBackoff{ | ||
options: []backoff.ExponentialBackOffOpts{ | ||
backoff.WithInitialInterval(initialInterval), | ||
backoff.WithMultiplier(2), | ||
backoff.WithClockProvider(clock), | ||
backoff.WithRandomizationFactor(0), | ||
backoff.WithMaxElapsedTime(0), | ||
backoff.WithMaxInterval(time.Hour), | ||
}, | ||
} | ||
} | ||
|
||
func (abe *authzBackoff) set() { | ||
if abe.backoff == nil { | ||
abe.backoff = backoff.NewExponentialBackOff(abe.options...) | ||
} | ||
// nextBackoff can't be a derived field since everytime NextBackOff is called, internal state of backoff is updated. | ||
abe.nextBackoff = abe.backoff.NextBackOff() | ||
} | ||
|
||
func (abe *authzBackoff) reset() { | ||
abe.backoff = nil | ||
} | ||
|
||
func (abe *authzBackoff) isInBackoff() bool { | ||
if abe.backoff == nil { | ||
return false | ||
} | ||
return abe.backoff.GetElapsedTime() < abe.nextBackoff | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably simplify this logic since we are required to only store nextBackoff time.Time
with the Manager struct and use it to decide whether we are in Backoff.
Created a short draft PR since it's difficult to explain within here.
func (m *mockManager) CreateSchema(ctx context.Context) (err error) { | ||
return m.createSchemaErr | ||
} | ||
|
||
func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap whutils.ModelTableSchema) (err error) { | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (m *mockManager) CreateSchema(ctx context.Context) (err error) { | |
return m.createSchemaErr | |
} | |
func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap whutils.ModelTableSchema) (err error) { | |
return nil | |
} | |
func (m *mockManager) CreateSchema(context.Context) error { | |
return m.createSchemaErr | |
} | |
func (m *mockManager) CreateTable(context.Context, string, whutils.ModelTableSchema) error { | |
return nil | |
} |
backoff.WithInitialInterval(initialInterval), | ||
backoff.WithMultiplier(2), | ||
backoff.WithClockProvider(clock), | ||
backoff.WithRandomizationFactor(0), | ||
backoff.WithMaxElapsedTime(0), | ||
backoff.WithMaxInterval(time.Hour), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make these configurable instead of hard coding it?
sm.Upload(asyncDestStruct) | ||
// client created again since backoff duration has been exceeded | ||
require.Equal(t, 2, managerCreatorCallCount) | ||
require.Equal(t, false, sm.authzBackoff.isInBackoff()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since here managerCreator will return an error, sm.authzBackoff.isInBackoff()
should return True
right because Post upload, backoff will be set.
Description
This PR introduces a backoff mechanism when creating channels in the Snowpipe streaming destination. If an authorization error is encountered at the schema, table, or column level, the backoff ensures that we avoid repeatedly attempting to connect to Snowflake unnecessarily.
authzBackoff
in manager to store backoff related statemanagerCreator
as a field in the manager for making the code testable.Security