Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement backoff for snowpipe streaming authorization errors #5399

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

type snowpipeAuthzError struct {
err error
}

func (sae *snowpipeAuthzError) Error() string {
return sae.err.Error()
}

// initializeChannelWithSchema creates a new channel for the given table if it doesn't exist.
// If the channel already exists, it checks for new columns and adds them to the table.
// It returns the channel response after creating or recreating the channel.
Expand Down Expand Up @@ -66,7 +74,8 @@
snowflakeManager.Cleanup(ctx)
}()
if err = snowflakeManager.AddColumns(ctx, tableName, columns); err != nil {
return fmt.Errorf("adding column: %w", err)
m.authzBackoff.set()
return &snowpipeAuthzError{fmt.Errorf("adding column: %w", err)}

Check warning on line 78 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}
return nil
}
Expand Down Expand Up @@ -157,10 +166,12 @@
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateSchema(ctx); err != nil {
return nil, fmt.Errorf("creating schema: %w", err)
m.authzBackoff.set()
return nil, &snowpipeAuthzError{fmt.Errorf("creating schema: %w", err)}
}
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %w", err)
m.authzBackoff.set()
return nil, &snowpipeAuthzError{fmt.Errorf("creating table: %w", err)}

Check warning on line 174 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L173-L174

Added lines #L173 - L174 were not covered by tests
}
return m.api.CreateChannel(ctx, channelReq)
}
Expand All @@ -185,7 +196,8 @@
snowflakeManager.Cleanup(ctx)
}()
if err := snowflakeManager.CreateTable(ctx, channelReq.TableConfig.Table, eventSchema); err != nil {
return nil, fmt.Errorf("creating table: %w", err)
m.authzBackoff.set()
return nil, &snowpipeAuthzError{fmt.Errorf("creating table: %w", err)}

Check warning on line 200 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/channel.go#L199-L200

Added lines #L199 - L200 were not covered by tests
}
return m.api.CreateChannel(ctx, channelReq)
}
Expand Down Expand Up @@ -225,6 +237,9 @@
}

func (m *Manager) createSnowflakeManager(ctx context.Context, namespace string) (manager.Manager, error) {
if m.now().Before(m.authzBackoff.nextBackoffTime()) {
return nil, &snowpipeAuthzError{fmt.Errorf("skipping snowflake manager creation due to backoff")}
}
modelWarehouse := whutils.ModelWarehouse{
WorkspaceID: m.destination.WorkspaceID,
Destination: *m.destination,
Expand All @@ -234,13 +249,5 @@
}
modelWarehouse.Destination.Config["useKeyPairAuth"] = true // Since we are currently only supporting key pair auth

sf, err := manager.New(whutils.SnowpipeStreaming, m.appConfig, m.logger, m.statsFactory)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}
err = sf.Setup(ctx, modelWarehouse, whutils.NewNoOpUploader())
if err != nil {
return nil, fmt.Errorf("setting up snowflake manager: %w", err)
}
return sf, nil
return m.managerCreator(ctx, modelWarehouse, m.appConfig, m.logger, m.statsFactory)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
"bufio"
"context"
stdjson "encoding/json"
"errors"
"fmt"
"net/http"
"os"
Expand Down Expand Up @@ -31,20 +32,21 @@
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/timeutil"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var json = jsoniter.ConfigCompatibleWithStandardLibrary

func New(
conf *config.Config,
logger logger.Logger,
mLogger logger.Logger,
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
statsFactory stats.Stats,
destination *backendconfig.DestinationT,
) *Manager {
m := &Manager{
appConfig: conf,
logger: logger.Child("snowpipestreaming").Withn(
logger: mLogger.Child("snowpipestreaming").Withn(
obskit.WorkspaceID(destination.WorkspaceID),
obskit.DestinationID(destination.ID),
obskit.DestinationType(destination.DestinationDefinition.Name),
Expand All @@ -67,6 +69,7 @@
m.config.client.retryMax = conf.GetInt("SnowpipeStreaming.Client.retryMax", 5)
m.config.instanceID = conf.GetString("INSTANCE_ID", "1")
m.config.maxBufferCapacity = conf.GetReloadableInt64Var(512*bytesize.KB, bytesize.B, "SnowpipeStreaming.maxBufferCapacity")
m.authzBackoff = newAuthzBackoff(conf.GetDuration("SnowpipeStreaming.backoffDuration", 1, time.Second))

tags := stats.Tags{
"module": "batch_router",
Expand Down Expand Up @@ -100,6 +103,17 @@
snowpipeapi.New(m.appConfig, m.statsFactory, m.config.client.url, m.requestDoer),
destination,
)
m.managerCreator = func(mCtx context.Context, modelWarehouse whutils.ModelWarehouse, conf *config.Config, logger logger.Logger, stats stats.Stats) (manager.Manager, error) {
sf, err := manager.New(whutils.SnowpipeStreaming, conf, logger, stats)
if err != nil {
return nil, fmt.Errorf("creating snowflake manager: %w", err)
}

Check warning on line 110 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go#L109-L110

Added lines #L109 - L110 were not covered by tests
err = sf.Setup(mCtx, modelWarehouse, whutils.NewNoOpUploader())
if err != nil {
return nil, fmt.Errorf("setting up snowflake manager: %w", err)
}

Check warning on line 114 in router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go

View check run for this annotation

Codecov / codecov/patch

router/batchrouter/asyncdestinationmanager/snowpipestreaming/snowpipestreaming.go#L113-L114

Added lines #L113 - L114 were not covered by tests
return sf, nil
}
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
return m
}

Expand Down Expand Up @@ -149,10 +163,23 @@

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// backoff should be reset if authz error is not encountered for any of the tables
shouldResetBackoff := true

discardsChannel, err := m.initializeChannelWithSchema(ctx, asyncDest.Destination.ID, &destConf, discardsTable(), discardsSchema())
if err != nil {
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
var authzErr *snowpipeAuthzError
if errors.As(err, &authzErr) {
// Ignoring this error so that the jobs are marked as failed and not aborted since
// we want these jobs to be retried the next time.
m.logger.Warnn("Failed to initialize channel with schema",
logger.NewStringField("table", discardsTable()),
obskit.Error(err),
)
shouldResetBackoff = false
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
} else {
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}
}
m.logger.Infon("Prepared discards channel")

Expand Down Expand Up @@ -187,6 +214,12 @@
for _, info := range uploadInfos {
imInfo, discardImInfo, err := m.sendEventsToSnowpipe(ctx, asyncDest.Destination.ID, &destConf, info)
if err != nil {
var authzErr *snowpipeAuthzError
if errors.As(err, &authzErr) {
if shouldResetBackoff {
shouldResetBackoff = false
}
}
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
m.logger.Warnn("Failed to send events to Snowpipe",
logger.NewStringField("table", info.tableName),
obskit.Error(err),
Expand All @@ -206,6 +239,9 @@
discardImportInfo.Offset = discardImInfo.Offset
}
}
if shouldResetBackoff {
m.authzBackoff.reset()
}
if discardImportInfo != nil {
importInfos = append(importInfos, discardImportInfo)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package snowpipestreaming

import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -16,7 +18,10 @@ import (
backendconfig "github.com/rudderlabs/rudder-server/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/common"
internalapi "github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/api"
"github.com/rudderlabs/rudder-server/router/batchrouter/asyncdestinationmanager/snowpipestreaming/internal/model"
"github.com/rudderlabs/rudder-server/warehouse/integrations/manager"
"github.com/rudderlabs/rudder-server/warehouse/integrations/snowflake"
whutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

Expand All @@ -43,6 +48,29 @@ func (m *mockAPI) GetStatus(_ context.Context, channelID string) (*model.StatusR
return m.getStatusOutputMap[channelID]()
}

type mockManager struct {
manager.Manager
throwSchemaErr bool
achettyiitr marked this conversation as resolved.
Show resolved Hide resolved
}

func newMockManager(m manager.Manager, throwSchemaErr bool) *mockManager {
return &mockManager{
Manager: m,
throwSchemaErr: throwSchemaErr,
}
}

func (m *mockManager) CreateSchema(ctx context.Context) (err error) {
if m.throwSchemaErr {
return fmt.Errorf("failed to create schema")
}
return nil
}

func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap manager.ModelTableSchema) (err error) {
return nil
}

var (
usersChannelResponse = &model.ChannelResponse{
ChannelID: "test-users-channel",
Expand Down Expand Up @@ -77,6 +105,7 @@ func TestSnowpipeStreaming(t *testing.T) {
DestinationDefinition: backendconfig.DestinationDefinitionT{
Name: "SNOWPIPE_STREAMING",
},
Config: make(map[string]interface{}),
}

t.Run("Upload with invalid file path", func(t *testing.T) {
Expand All @@ -100,6 +129,7 @@ func TestSnowpipeStreaming(t *testing.T) {
"status": "aborted",
}).LastValue())
})

t.Run("Upload with invalid record in file", func(t *testing.T) {
statsStore, err := memstats.New()
require.NoError(t, err)
Expand Down Expand Up @@ -310,6 +340,96 @@ func TestSnowpipeStreaming(t *testing.T) {
"status": "failed",
}).LastValue())
})

t.Run("Upload with unauthorized schema error should add backoff", func(t *testing.T) {
statsStore, err := memstats.New()
require.NoError(t, err)

sm := New(config.New(), logger.NOP, statsStore, destination)
sm.channelCache.Store("RUDDER_DISCARDS", rudderDiscardsChannelResponse)
sm.api = &mockAPI{
createChannelOutputMap: map[string]func() (*model.ChannelResponse, error){
"USERS": func() (*model.ChannelResponse, error) {
return &model.ChannelResponse{Code: internalapi.ErrSchemaDoesNotExistOrNotAuthorized}, nil
},
},
}
managerCreatorCallCount := 0
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
managerCreatorCallCount++
return newMockManager(sm, true), nil
}
sm.authzBackoff = newAuthzBackoff(time.Second * 10)
asyncDestStruct := &common.AsyncDestinationStruct{
Destination: destination,
FileName: "testdata/successful_user_records.txt",
}
output1 := sm.Upload(asyncDestStruct)
require.Equal(t, 2, output1.FailedCount)
require.Equal(t, 0, output1.AbortCount)
require.Equal(t, 1, managerCreatorCallCount)
require.Equal(t, time.Second*10, sm.authzBackoff.backoffDuration)
require.Equal(t, false, sm.authzBackoff.lastestErrorTime.IsZero())

sm.Upload(asyncDestStruct)
// client is not created again due to backoff error
require.Equal(t, 1, managerCreatorCallCount)
require.Equal(t, time.Second*10, sm.authzBackoff.backoffDuration)
require.Equal(t, false, sm.authzBackoff.lastestErrorTime.IsZero())

sm.now = func() time.Time {
return time.Now().UTC().Add(time.Second * 100)
}

sm.Upload(asyncDestStruct)
// client created again since backoff duration has been exceeded
require.Equal(t, 2, managerCreatorCallCount)
require.Equal(t, time.Second*20, sm.authzBackoff.backoffDuration)
require.Equal(t, false, sm.authzBackoff.lastestErrorTime.IsZero())

sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
managerCreatorCallCount++
return newMockManager(sm, false), nil
}
sm.now = func() time.Time {
return time.Now().UTC().Add(time.Second * 200)
}
sm.Upload(asyncDestStruct)
require.Equal(t, 3, managerCreatorCallCount)
// no error should reset the backoff config
require.Equal(t, time.Duration(0), sm.authzBackoff.backoffDuration)
require.Equal(t, true, sm.authzBackoff.lastestErrorTime.IsZero())
})

t.Run("Upload with discards table authorization error should not abort the job", func(t *testing.T) {
statsStore, err := memstats.New()
require.NoError(t, err)

sm := New(config.New(), logger.NOP, statsStore, destination)
sm.api = &mockAPI{
createChannelOutputMap: map[string]func() (*model.ChannelResponse, error){
"RUDDER_DISCARDS": func() (*model.ChannelResponse, error) {
return &model.ChannelResponse{Code: internalapi.ErrSchemaDoesNotExistOrNotAuthorized}, nil
},
"USERS": func() (*model.ChannelResponse, error) {
return &model.ChannelResponse{Code: internalapi.ErrSchemaDoesNotExistOrNotAuthorized}, nil
},
},
}
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
return newMockManager(sm, true), nil
}
output := sm.Upload(&common.AsyncDestinationStruct{
Destination: destination,
FileName: "testdata/successful_user_records.txt",
})
require.Equal(t, 2, output.FailedCount)
require.Equal(t, 0, output.AbortCount)
})

t.Run("Upload insert error for all events", func(t *testing.T) {
statsStore, err := memstats.New()
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"message":{"metadata":{"table":"USERS","columns":{"ID":"int","NAME":"string","AGE":"int","RECEIVED_AT":"datetime"}},"data":{"ID":1,"NAME":"Alice","AGE":30,"RECEIVED_AT":"2023-05-12T04:36:50.199Z"}},"metadata":{"job_id":1001}}
{"message":{"metadata":{"table":"USERS","columns":{"ID":"int","NAME":"string","AGE":"int","RECEIVED_AT":"datetime"}},"data":{"ID":1,"NAME":"Alice","AGE":30,"RECEIVED_AT":"2023-05-12T04:36:50.199Z"}},"metadata":{"job_id":1003}}
Loading
Loading