Skip to content

Commit

Permalink
feat: warehouse transformer
Browse files Browse the repository at this point in the history
  • Loading branch information
achettyiitr committed Oct 18, 2024
1 parent 9683021 commit 1a41ba4
Show file tree
Hide file tree
Showing 115 changed files with 15,829 additions and 315 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
github.com/databricks/databricks-sql-go v1.6.1
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dgraph-io/badger/v4 v4.3.1
github.com/dlclark/regexp2 v1.11.4
github.com/docker/docker v27.3.1+incompatible
github.com/go-chi/chi/v5 v5.1.0
github.com/go-redis/redis v6.15.9+incompatible
Expand Down Expand Up @@ -76,7 +77,7 @@ require (
github.com/rudderlabs/analytics-go v3.3.3+incompatible
github.com/rudderlabs/bing-ads-go-sdk v0.2.3
github.com/rudderlabs/compose-test v0.1.3
github.com/rudderlabs/rudder-go-kit v0.43.0
github.com/rudderlabs/rudder-go-kit v0.43.1-0.20241017045502-08a98c5f8442
github.com/rudderlabs/rudder-observability-kit v0.0.3
github.com/rudderlabs/rudder-schemas v0.5.3
github.com/rudderlabs/rudder-transformer/go v0.0.0-20240910055720-f77d2ab4125a
Expand Down Expand Up @@ -187,7 +188,6 @@ require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/dgraph-io/ristretto v1.0.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dlclark/regexp2 v1.11.4 // indirect
github.com/dnephin/pflag v1.0.7 // indirect
github.com/docker/cli v27.2.1+incompatible // indirect
github.com/docker/cli-docs-tool v0.8.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1168,8 +1168,8 @@ github.com/rudderlabs/goqu/v10 v10.3.1 h1:rnfX+b4EwBWQ2UQfIGeEW299JBBkK5biEbnf7K
github.com/rudderlabs/goqu/v10 v10.3.1/go.mod h1:LH2vI5gGHBxEQuESqFyk5ZA2anGINc8o25hbidDWOYw=
github.com/rudderlabs/parquet-go v0.0.2 h1:ZXRdZdimB0PdJtmxeSSxfI0fDQ3kZjwzBxRi6Ut1J8k=
github.com/rudderlabs/parquet-go v0.0.2/go.mod h1:g6guum7o8uhj/uNhunnt7bw5Vabu/goI5i21/3fnxWQ=
github.com/rudderlabs/rudder-go-kit v0.43.0 h1:N6CAvQdjufitdiUl424+AcMebEmieB0TO5PhARwXvw8=
github.com/rudderlabs/rudder-go-kit v0.43.0/go.mod h1:NrHCi0KSzHSMFXQu0t2kgJcE4ClAKklVXfb2glADvQ4=
github.com/rudderlabs/rudder-go-kit v0.43.1-0.20241017045502-08a98c5f8442 h1:WAYL/6chiRSIeKwSNGd9sclWNWbKBwenGbUhiyxQIi4=
github.com/rudderlabs/rudder-go-kit v0.43.1-0.20241017045502-08a98c5f8442/go.mod h1:NrHCi0KSzHSMFXQu0t2kgJcE4ClAKklVXfb2glADvQ4=
github.com/rudderlabs/rudder-observability-kit v0.0.3 h1:vZtuZRkGX+6rjaeKtxxFE2YYP6QlmAcVcgecTOjvz+Q=
github.com/rudderlabs/rudder-observability-kit v0.0.3/go.mod h1:6UjAh3H6rkE0fFLh7z8ZGQEQbKtUkRfhWOf/OUhfqW8=
github.com/rudderlabs/rudder-schemas v0.5.3 h1:IWWjAo2TzsjwHNhS2EAr1+0MjvA8BoTpJvB2o/GFwNU=
Expand Down
20 changes: 16 additions & 4 deletions processor/transformer/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,13 +140,25 @@ func WithClient(client *http.Client) Opt {
}
}

// Transformer provides methods to transform events
type Transformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
type UserTransformer interface {
UserTransform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type DestinationTransformer interface {
Transform(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

type TrackingPlanValidator interface {
Validate(ctx context.Context, clientEvents []TransformerEvent, batchSize int) Response
}

// Transformer provides methods to transform events
type Transformer interface {
UserTransformer
DestinationTransformer
TrackingPlanValidator
}

// handle is the handle for this class
type handle struct {
sentStat stats.Measurement
Expand Down Expand Up @@ -526,7 +538,7 @@ func (trans *handle) destTransformURL(destType string) string {
destinationEndPoint := fmt.Sprintf("%s/v0/destinations/%s", trans.config.destTransformationURL, strings.ToLower(destType))

if _, ok := warehouseutils.WarehouseDestinationMap[destType]; ok {
whSchemaVersionQueryParam := fmt.Sprintf("whSchemaVersion=%s&whIDResolve=%v", trans.conf.GetString("Warehouse.schemaVersion", "v1"), warehouseutils.IDResolutionEnabled())
whSchemaVersionQueryParam := fmt.Sprintf("whIDResolve=%v", trans.conf.GetBool("Warehouse.enableIDResolution", false))
switch destType {
case warehouseutils.RS:
return destinationEndPoint + "?" + whSchemaVersionQueryParam
Expand Down
4 changes: 2 additions & 2 deletions warehouse/api/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,13 +679,13 @@ func (g *GRPC) ValidateObjectStorageDestination(ctx context.Context, request *pr
byt, err := json.Marshal(request)
if err != nil {
return &proto.ValidateObjectStorageResponse{},
status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), "unable to marshal the request proto message with error: \n%s", err.Error())
status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), "unable to marshal the request proto message with response: \n%s", err.Error())
}

var validateRequest validateObjectStorageRequest
if err := json.Unmarshal(byt, &validateRequest); err != nil {
return &proto.ValidateObjectStorageResponse{},
status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), "unable to extract data into validation request with error: \n%s", err)
status.Errorf(codes.Code(code.Code_INVALID_ARGUMENT), "unable to extract data into validation request with response: \n%s", err)
}

switch request.Type {
Expand Down
26 changes: 13 additions & 13 deletions warehouse/api/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func TestGRPC(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, res)
require.Empty(t, res.GetError())
require.Equal(t, res.GetData(), `{"steps":[{"id":1,"name":"Verifying Object Storage","success":false,"error":""},{"id":2,"name":"Verifying Connections","success":false,"error":""},{"id":3,"name":"Verifying Create Schema","success":false,"error":""},{"id":4,"name":"Verifying Create and Alter Table","success":false,"error":""},{"id":5,"name":"Verifying Fetch Schema","success":false,"error":""},{"id":6,"name":"Verifying Load Table","success":false,"error":""}]}`)
require.Equal(t, res.GetData(), `{"steps":[{"id":1,"name":"Verifying Object Storage","success":false,"response":""},{"id":2,"name":"Verifying Connections","success":false,"response":""},{"id":3,"name":"Verifying Create Schema","success":false,"response":""},{"id":4,"name":"Verifying Create and Alter Table","success":false,"response":""},{"id":5,"name":"Verifying Fetch Schema","success":false,"response":""},{"id":6,"name":"Verifying Load Table","success":false,"response":""}]}`)
})
t.Run("validate", func(t *testing.T) {
res, err := grpcClient.Validate(ctx, &proto.WHValidationRequest{
Expand All @@ -955,7 +955,7 @@ func TestGRPC(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, res)
require.Empty(t, res.GetError())
require.Equal(t, res.GetData(), `{"success":true,"error":"","steps":[{"id":1,"name":"Verifying Object Storage","success":true,"error":""},{"id":2,"name":"Verifying Connections","success":true,"error":""},{"id":3,"name":"Verifying Create Schema","success":true,"error":""},{"id":4,"name":"Verifying Create and Alter Table","success":true,"error":""},{"id":5,"name":"Verifying Fetch Schema","success":true,"error":""},{"id":6,"name":"Verifying Load Table","success":true,"error":""}]}`)
require.Equal(t, res.GetData(), `{"success":true,"response":"","steps":[{"id":1,"name":"Verifying Object Storage","success":true,"response":""},{"id":2,"name":"Verifying Connections","success":true,"response":""},{"id":3,"name":"Verifying Create Schema","success":true,"response":""},{"id":4,"name":"Verifying Create and Alter Table","success":true,"response":""},{"id":5,"name":"Verifying Fetch Schema","success":true,"response":""},{"id":6,"name":"Verifying Load Table","success":true,"response":""}]}`)
})
})
t.Run("tunneling", func(t *testing.T) {
Expand Down Expand Up @@ -1273,7 +1273,7 @@ func TestGRPC(t *testing.T) {
errorJson, err := json.Marshal(error)
require.NoError(t, err)

_, err = db.ExecContext(ctx, `UPDATE wh_uploads SET error = $1, error_category = $2 WHERE id = $3`,
_, err = db.ExecContext(ctx, `UPDATE wh_uploads SET response = $1, error_category = $2 WHERE id = $3`,
errorJson,
errorCategory,
uploadID,
Expand Down Expand Up @@ -1324,7 +1324,7 @@ func TestGRPC(t *testing.T) {
}{
{
status: "internal_processing_failed",
error: json.RawMessage(`{"internal_processing_failed":{"errors":["some error 1","some error 2"],"attempt":2}}`),
error: json.RawMessage(`{"internal_processing_failed":{"errors":["some response 1","some response 2"],"attempt":2}}`),
errorCategory: model.UncategorizedError,
prepareTableUploads: false,
timings: model.Timings{
Expand All @@ -1335,7 +1335,7 @@ func TestGRPC(t *testing.T) {
},
{
status: "generating_load_files_failed",
error: json.RawMessage(`{"generating_load_files_failed":{"errors":["some error 3","some error 4"],"attempt":2}}`),
error: json.RawMessage(`{"generating_load_files_failed":{"errors":["some response 3","some response 4"],"attempt":2}}`),
errorCategory: model.UncategorizedError,
prepareTableUploads: false,
timings: model.Timings{
Expand All @@ -1346,7 +1346,7 @@ func TestGRPC(t *testing.T) {
},
{
status: "exporting_data_failed",
error: json.RawMessage(`{"exporting_data_failed":{"errors":["some error 5","some error 6"],"attempt":2}}`),
error: json.RawMessage(`{"exporting_data_failed":{"errors":["some response 5","some response 6"],"attempt":2}}`),
errorCategory: model.PermissionError,
prepareTableUploads: true,
timings: model.Timings{
Expand All @@ -1357,7 +1357,7 @@ func TestGRPC(t *testing.T) {
},
{
status: "aborted",
error: json.RawMessage(`{"exporting_data_failed":{"errors":["some error 7","some error 8"],"attempt":2}}`),
error: json.RawMessage(`{"exporting_data_failed":{"errors":["some response 7","some response 8"],"attempt":2}}`),
prepareTableUploads: true,
errorCategory: model.ResourceNotFoundError,
timings: model.Timings{
Expand Down Expand Up @@ -1448,7 +1448,7 @@ func TestGRPC(t *testing.T) {
})
require.EqualValues(t, failedBatches, []model.RetrieveFailedBatchesResponse{
{
Error: "some error 6",
Error: "some response 6",
ErrorCategory: model.PermissionError,
SourceID: sourceID,
TotalEvents: 500,
Expand All @@ -1458,7 +1458,7 @@ func TestGRPC(t *testing.T) {
Status: model.Failed,
},
{
Error: "some error 8",
Error: "some response 8",
ErrorCategory: model.ResourceNotFoundError,
SourceID: sourceID,
TotalEvents: 500,
Expand All @@ -1468,7 +1468,7 @@ func TestGRPC(t *testing.T) {
Status: model.Aborted,
},
{
Error: "some error 2",
Error: "some response 2",
ErrorCategory: model.UncategorizedError,
SourceID: sourceID,
TotalEvents: 1200,
Expand Down Expand Up @@ -1518,7 +1518,7 @@ func TestGRPC(t *testing.T) {
})
require.EqualValues(t, failedBatches, []model.RetrieveFailedBatchesResponse{
{
Error: "some error 6",
Error: "some response 6",
ErrorCategory: model.PermissionError,
SourceID: sourceID,
TotalEvents: 500,
Expand All @@ -1528,7 +1528,7 @@ func TestGRPC(t *testing.T) {
Status: model.Failed,
},
{
Error: "some error 8",
Error: "some response 8",
ErrorCategory: model.ResourceNotFoundError,
SourceID: sourceID,
TotalEvents: 500,
Expand All @@ -1538,7 +1538,7 @@ func TestGRPC(t *testing.T) {
Status: model.Aborted,
},
{
Error: "some error 2",
Error: "some response 2",
ErrorCategory: model.UncategorizedError,
SourceID: sourceID,
TotalEvents: 1200,
Expand Down
2 changes: 1 addition & 1 deletion warehouse/api/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func TestHTTPApi(t *testing.T) {
SourceID: sourceID,
DestinationID: destinationID,
Status: warehouseutils.StagingFileWaitingState,
Error: fmt.Errorf("dummy error"),
Error: fmt.Errorf("dummy response"),
FirstEventAt: now.Add(time.Second),
UseRudderStorage: true,
DestinationRevisionID: "destination_revision_id",
Expand Down
10 changes: 5 additions & 5 deletions warehouse/archive/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (a *Archiver) backupRecords(ctx context.Context, args backupRecordsArgs) (b
Config: filemanagerutil.GetProviderConfigForBackupsFromEnv(ctx, a.conf),
})
if err != nil {
err = fmt.Errorf("error in creating a file manager for:%s. Error: %w",
err = fmt.Errorf("response in creating a file manager for:%s. Error: %w",
a.conf.GetString("JOBS_BACKUP_STORAGE_PROVIDER", "S3"), err,
)
return
Expand Down Expand Up @@ -177,7 +177,7 @@ func (a *Archiver) deleteFilesInStorage(ctx context.Context, locations []string)
Config: misc.GetRudderObjectStorageConfig(""),
})
if err != nil {
err = fmt.Errorf("error in creating a file manager for Rudder Storage. Error: %w", err)
err = fmt.Errorf("response in creating a file manager for Rudder Storage. Error: %w", err)
return err
}

Expand Down Expand Up @@ -288,7 +288,7 @@ func (a *Archiver) archiveUploads(ctx context.Context, maxArchiveLimit int) erro
)
defer func() {
if err != nil {
a.log.Errorf(`[Archiver]: Error occurred while archiving for warehouse uploads with error: %v`, err)
a.log.Errorf(`[Archiver]: Error occurred while archiving for warehouse uploads with response: %v`, err)
a.archiveFailedStat.Increment()
}
}()
Expand Down Expand Up @@ -507,7 +507,7 @@ func (a *Archiver) deleteLoadFileRecords(

err = a.deleteFilesInStorage(ctx, loadLocations)
if err != nil {
return fmt.Errorf("error deleting files in storage: %w", err)
return fmt.Errorf("response deleting files in storage: %w", err)
}

return nil
Expand Down Expand Up @@ -557,7 +557,7 @@ func (a *Archiver) deleteUploads(ctx context.Context, limit int) (int64, error)
limit,
)
if err != nil {
return 0, fmt.Errorf("error deleting uploads: %w", err)
return 0, fmt.Errorf("response deleting uploads: %w", err)
}
return result.RowsAffected()
}
2 changes: 1 addition & 1 deletion warehouse/encoding/jsonreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (js *jsonReader) Read(columnNames []string) ([]string, error) {

// newJSONReader returns a new JSON reader
// default scanner buffer maxCapacity is 64K
// set it to higher value to avoid read stop on read size error
// set it to higher value to avoid read stop on read size response
func newJSONReader(r io.Reader, bufferCapacityInK int) *jsonReader {
maxCapacity := bufferCapacityInK * 1024

Expand Down
4 changes: 2 additions & 2 deletions warehouse/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (idr *Identity) Resolve(ctx context.Context) (err error) {
defer misc.RemoveFilePaths(loadFileNames...)
loadFileNames, err = idr.downloader.Download(ctx, idr.whMergeRulesTable())
if err != nil {
pkgLogger.Errorf(`IDR: Failed to download load files for %s with error: %v`, idr.mergeRulesTable(), err)
pkgLogger.Errorf(`IDR: Failed to download load files for %s with response: %v`, idr.mergeRulesTable(), err)
return
}

Expand All @@ -528,7 +528,7 @@ func (idr *Identity) ResolveHistoricIdentities(ctx context.Context) (err error)
err = idr.warehouseManager.DownloadIdentityRules(ctx, &gzWriter)
_ = gzWriter.CloseGZ()
if err != nil {
pkgLogger.Errorf(`IDR: Failed to download identity information from warehouse with error: %v`, err)
pkgLogger.Errorf(`IDR: Failed to download identity information from warehouse with response: %v`, err)
return
}
loadFileNames = append(loadFileNames, path)
Expand Down
4 changes: 2 additions & 2 deletions warehouse/integrations/azure-synapse/azure-synapse.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,11 +433,11 @@ func (as *AzureSynapse) loadDataIntoStagingTable(
// it is necessary to handle the scenario where new columns are added to the target table.
// Without this adjustment, attempting to perform 'copyIn' when the column count in the
// target table does not match the column count specified in the input data will result
// in an error like:
// in an response like:
//
// mssql: Column count in target table does not match column count specified in input.
//
// If this error is encountered, it is important to verify that the column structure in
// If this response is encountered, it is important to verify that the column structure in
// the source data matches the destination table's structure. If you are using the BCP command,
// ensure that the format file's column count matches the destination table. For SSIS data imports,
// double-check that the column mappings are consistent with the target table.
Expand Down
6 changes: 3 additions & 3 deletions warehouse/integrations/azure-synapse/azure_synapse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,7 +716,7 @@ func TestIntegration(t *testing.T) {
require.Equal(t, 0, count, "staging table should be dropped")
})

t.Run("query error", func(t *testing.T) {
t.Run("query response", func(t *testing.T) {
az := azuresynapse.New(config.New(), logger.NOP, stats.NOP)
err := az.Setup(ctx, warehouse, mockUploader)
require.NoError(t, err)
Expand All @@ -727,12 +727,12 @@ func TestIntegration(t *testing.T) {
_ = db.Close()
}()

dbMock.ExpectQuery("select table_name").WillReturnError(fmt.Errorf("query error"))
dbMock.ExpectQuery("select table_name").WillReturnError(fmt.Errorf("query response"))

// TODO: Add more test cases
az.DB = sqlquerywrapper.New(db)
err = az.CrashRecover(ctx)
require.ErrorContains(t, err, "query error")
require.ErrorContains(t, err, "query response")
})
})
}
Expand Down
8 changes: 4 additions & 4 deletions warehouse/integrations/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ func checkAndIgnoreAlreadyExistError(err error) bool {
var e *googleapi.Error
if errors.As(err, &e) {
// 409 is returned when we try to create a table that already exists
// 400 is returned for all kinds of invalid input - so we need to check the error message too
// 400 is returned for all kinds of invalid input - so we need to check the response message too
if e.Code == 409 || (e.Code == 400 && strings.Contains(e.Message, "already exists in schema")) {
return true
}
Expand Down Expand Up @@ -545,7 +545,7 @@ func (bq *BigQuery) loadTableByAppend(
}

// jobStatistics returns statistics for a job
// In case of rate limit error, it returns empty statistics
// In case of rate limit response, it returns empty statistics
func (bq *BigQuery) jobStatistics(
ctx context.Context,
job *bigquery.Job,
Expand All @@ -564,7 +564,7 @@ func (bq *BigQuery) jobStatistics(
)
bqJob, err := bqJobGetCall.Context(ctx).Location(job.Location()).Fields("statistics").Do()
if err != nil {
// In case of rate limit error, return empty statistics
// In case of rate limit response, return empty statistics
var e *googleapi.Error
if errors.As(err, &e) && e.Code == 429 {
return &bqservice.JobStatistics{}, nil
Expand Down Expand Up @@ -899,7 +899,7 @@ func (bq *BigQuery) AddColumns(ctx context.Context, tableName string, columnsInf
}
_, err = tableRef.Update(ctx, tableMetadataToUpdate, meta.ETag)

// Handle error in case of single column
// Handle response in case of single column
if len(columnsInfo) == 1 {
if err != nil {
if checkAndIgnoreAlreadyExistError(err) {
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/bigquery/bigquery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,7 @@ func dropSchema(t testing.TB, db *bigquery.Client, namespace string) {

require.Eventually(t, func() bool {
if err := db.Dataset(namespace).DeleteWithContents(context.Background()); err != nil {
t.Logf("error deleting dataset: %v", err)
t.Logf("response deleting dataset: %v", err)
return false
}
return true
Expand Down
2 changes: 1 addition & 1 deletion warehouse/integrations/bigquery/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ var supportedPartitionTypeMap = map[string]bigquery.TimePartitioningType{

// avoidPartitionDecorator returns true if custom partition is enabled via destination or global config
// if we know beforehand that the data is in a single partition, specifying the partition decorator can improve write performance.
// However, in case of time-unit column and integer-range partitioned tables, the partition ID specified in the decorator must match the data being written. Otherwise, an error occurs.
// However, in case of time-unit column and integer-range partitioned tables, the partition ID specified in the decorator must match the data being written. Otherwise, an response occurs.
// Therefore, if we are not sure about the partition decorator, we should not specify it i.e. in case when it is enabled via destination config.
func (bq *BigQuery) avoidPartitionDecorator() bool {
return bq.customPartitionEnabledViaGlobalConfig() || bq.isTimeUnitPartitionColumn()
Expand Down
Loading

0 comments on commit 1a41ba4

Please sign in to comment.