Skip to content

Commit

Permalink
refactor: review comments 1
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhar-rudder committed Dec 30, 2024
1 parent 9df59d0 commit aa4f042
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ var json = jsoniter.ConfigCompatibleWithStandardLibrary

func New(
conf *config.Config,
mLogger logger.Logger,
log logger.Logger,
statsFactory stats.Stats,
destination *backendconfig.DestinationT,
) *Manager {
m := &Manager{
appConfig: conf,
logger: mLogger.Child("snowpipestreaming").Withn(
logger: log.Child("snowpipestreaming").Withn(
obskit.WorkspaceID(destination.WorkspaceID),
obskit.DestinationID(destination.ID),
obskit.DestinationType(destination.DestinationDefinition.Name),
Expand Down Expand Up @@ -170,13 +170,7 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
if err != nil {
var authzErr *snowpipeAuthzError
if errors.As(err, &authzErr) {
// Ignoring this error so that the jobs are marked as failed and not aborted since
// we want these jobs to be retried the next time.
m.logger.Warnn("Failed to initialize channel with schema",
logger.NewStringField("table", discardsTable()),
obskit.Error(err),
)
shouldResetBackoff = false
return m.failedJobs(asyncDest, err.Error())
} else {
return m.abortJobs(asyncDest, fmt.Errorf("failed to prepare discards channel: %w", err).Error())
}
Expand Down Expand Up @@ -216,9 +210,7 @@ func (m *Manager) Upload(asyncDest *common.AsyncDestinationStruct) common.AsyncU
if err != nil {
var authzErr *snowpipeAuthzError
if errors.As(err, &authzErr) {
if shouldResetBackoff {
shouldResetBackoff = false
}
shouldResetBackoff = false
}
m.logger.Warnn("Failed to send events to Snowpipe",
logger.NewStringField("table", info.tableName),
Expand Down Expand Up @@ -398,6 +390,16 @@ func (m *Manager) abortJobs(asyncDest *common.AsyncDestinationStruct, abortReaso
}
}

func (m *Manager) failedJobs(asyncDest *common.AsyncDestinationStruct, failedReason string) common.AsyncUploadOutput {
m.stats.jobs.failed.Count(len(asyncDest.ImportingJobIDs))
return common.AsyncUploadOutput{
AbortJobIDs: asyncDest.ImportingJobIDs,
AbortCount: len(asyncDest.ImportingJobIDs),
FailedReason: failedReason,
DestinationID: asyncDest.Destination.ID,
}
}

// Poll checks the status of multiple imports using the import ID from pollInput.
// For the once which have reached the terminal state (success or failure), it caches the import infos in polledImportInfoMap. Later if Poll is called again, it does not need to do the status check again.
// Once all the imports have reached the terminal state, if any imports have failed, it deletes the channels for those imports.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,24 +50,20 @@ func (m *mockAPI) GetStatus(_ context.Context, channelID string) (*model.StatusR

type mockManager struct {
manager.Manager
throwSchemaErr bool
createSchemaErr error
}

func newMockManager(m manager.Manager, throwSchemaErr bool) *mockManager {
func newMockManager(m manager.Manager) *mockManager {
return &mockManager{
Manager: m,
throwSchemaErr: throwSchemaErr,
Manager: m,
}
}

func (m *mockManager) CreateSchema(ctx context.Context) (err error) {
if m.throwSchemaErr {
return fmt.Errorf("failed to create schema")
}
return nil
return m.createSchemaErr
}

func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap manager.ModelTableSchema) (err error) {
func (m *mockManager) CreateTable(ctx context.Context, tableName string, columnMap whutils.ModelTableSchema) (err error) {
return nil
}

Expand Down Expand Up @@ -358,7 +354,9 @@ func TestSnowpipeStreaming(t *testing.T) {
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
managerCreatorCallCount++
return newMockManager(sm, true), nil
mockManager := newMockManager(sm)
mockManager.createSchemaErr = fmt.Errorf("failed to create schema")
return mockManager, nil
}
sm.authzBackoff = newAuthzBackoff(time.Second * 10)
asyncDestStruct := &common.AsyncDestinationStruct{
Expand Down Expand Up @@ -391,7 +389,7 @@ func TestSnowpipeStreaming(t *testing.T) {
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
managerCreatorCallCount++
return newMockManager(sm, false), nil
return newMockManager(sm), nil
}
sm.now = func() time.Time {
return time.Now().UTC().Add(time.Second * 200)
Expand Down Expand Up @@ -420,7 +418,9 @@ func TestSnowpipeStreaming(t *testing.T) {
}
sm.managerCreator = func(_ context.Context, _ whutils.ModelWarehouse, _ *config.Config, _ logger.Logger, _ stats.Stats) (manager.Manager, error) {
sm := snowflake.New(config.New(), logger.NOP, stats.NOP)
return newMockManager(sm, true), nil
mockManager := newMockManager(sm)
mockManager.createSchemaErr = fmt.Errorf("failed to create schema")
return mockManager, nil
}
output := sm.Upload(&common.AsyncDestinationStruct{
Destination: destination,
Expand Down
3 changes: 0 additions & 3 deletions warehouse/integrations/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ import (
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

// Creating an alias since "model.TableSchema" is defined in an internal module
type ModelTableSchema = model.TableSchema

type Manager interface {
Setup(ctx context.Context, warehouse model.Warehouse, uploader warehouseutils.Uploader) error
FetchSchema(ctx context.Context) (model.Schema, error)
Expand Down

0 comments on commit aa4f042

Please sign in to comment.