Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Supporting PostgreSQL as a destination #311

Draft
wants to merge 14 commits into
base: master
Choose a base branch
from
22 changes: 22 additions & 0 deletions clients/postgresql/cast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package postgresql

import (
"github.com/artie-labs/transfer/lib/typing/values"

"github.com/artie-labs/transfer/lib/typing/columns"
)

// CastColValStaging - takes `colVal` interface{} and `colKind` typing.Column and converts the value into a string value
// This is necessary because CSV writers require values to in `string`.
func (s *Store) CastColValStaging(colVal interface{}, colKind columns.Column, additionalDateFmts []string) (any, error) {
if colVal == nil {
return nil, nil
}

colValString, err := values.ToString(colVal, colKind, additionalDateFmts)
if err != nil {
return "", err
}

return colValString, nil
}
136 changes: 136 additions & 0 deletions clients/postgresql/merge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package postgresql

import (
"context"
"fmt"
"log/slog"
"strings"

"github.com/artie-labs/transfer/lib/sql"

"github.com/artie-labs/transfer/clients/utils"
"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/dml"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/ptr"
"github.com/artie-labs/transfer/lib/typing/columns"
)

func (s *Store) Merge(ctx context.Context, tableData *optimization.TableData) error {
if tableData.Rows() == 0 || tableData.ReadOnlyInMemoryCols() == nil {
// There's no rows or columns. Let's skip.
return nil
}

tableConfig, err := s.getTableConfig(tableData)
if err != nil {
return err
}

fqName := tableData.ToFqName(s.Label(), true, s.config.SharedDestinationConfig.UppercaseEscapedNames, "")
// Check if all the columns exist in Redshift
srcKeysMissing, targetKeysMissing := columns.Diff(tableData.ReadOnlyInMemoryCols(), tableConfig.Columns(),
tableData.TopicConfig.SoftDelete, tableData.TopicConfig.IncludeArtieUpdatedAt, tableData.TopicConfig.IncludeDatabaseUpdatedAt)

fmt.Println("srcKeysMissing", srcKeysMissing, "targetKeysMissing", targetKeysMissing)

createAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: fqName,
CreateTable: tableConfig.CreateTable(),
ColumnOp: constants.Add,
CdcTime: tableData.LatestCDCTs,
UppercaseEscNames: &s.config.SharedDestinationConfig.UppercaseEscapedNames,
}

// Keys that exist in CDC stream, but not in Redshift
err = ddl.AlterTable(createAlterTableArgs, targetKeysMissing...)
if err != nil {
slog.Warn("Failed to apply alter table", slog.Any("err", err))
return err
}

// Keys that exist in Redshift, but don't exist in our CDC stream.
// createTable is set to false because table creation requires a column to be added
// Which means, we'll only do it upon Add columns.
deleteAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: fqName,
CreateTable: false,
ColumnOp: constants.Delete,
ContainOtherOperations: tableData.ContainOtherOperations(),
CdcTime: tableData.LatestCDCTs,
UppercaseEscNames: &s.config.SharedDestinationConfig.UppercaseEscapedNames,
}

err = ddl.AlterTable(deleteAlterTableArgs, srcKeysMissing...)
if err != nil {
slog.Warn("Failed to apply alter table", slog.Any("err", err))
return err
}

tableConfig.AuditColumnsToDelete(srcKeysMissing)
tableData.MergeColumnsFromDestination(tableConfig.Columns().GetColumns()...)

// TODO: Remove strings.toLower() in favor of pq.quoteIdentifer.
temporaryTableName := strings.ToLower(fmt.Sprintf("%s_%s", tableData.ToFqName(s.Label(), false, s.config.SharedDestinationConfig.UppercaseEscapedNames, ""), tableData.TempTableSuffix()))
if err = s.prepareTempTable(tableData, tableConfig, temporaryTableName); err != nil {
return err
}

// Now iterate over all the in-memory cols and see which one requires backfill.
for _, col := range tableData.ReadOnlyInMemoryCols().GetColumns() {
if col.ShouldSkip() {
continue
}

err = utils.BackfillColumn(s.config, s, col, fqName)
if err != nil {
return fmt.Errorf("failed to backfill col: %v, default value: %v, err: %v", col.RawName(), col.RawDefaultValue(), err)
}

tableConfig.Columns().UpsertColumn(col.RawName(), columns.UpsertColumnArg{
Backfilled: ptr.ToBool(true),
})
}

mergArg := dml.MergeArgument{
FqTableName: fqName,
SubQuery: temporaryTableName,
IdempotentKey: tableData.TopicConfig.IdempotentKey,
PrimaryKeys: tableData.PrimaryKeys(s.config.SharedDestinationConfig.UppercaseEscapedNames, &sql.NameArgs{Escape: true, DestKind: s.Label()}),
ColumnsToTypes: *tableData.ReadOnlyInMemoryCols(),
ContainsHardDeletes: tableData.ContainsHardDeletes(),
SoftDelete: tableData.TopicConfig.SoftDelete,
DestKind: s.Label(),
UppercaseEscNames: &s.config.SharedDestinationConfig.UppercaseEscapedNames,
}

// Prepare merge statement
mergeParts, err := mergArg.GetParts()
if err != nil {
return fmt.Errorf("failed to generate merge statement, err: %v", err)
}

tx, err := s.Begin()
if err != nil {
return fmt.Errorf("failed to start tx, err: %v", err)
}

for _, mergeQuery := range mergeParts {
_, err = tx.Exec(mergeQuery)
if err != nil {
return fmt.Errorf("failed to merge, query: %v, err: %v", mergeQuery, err)
}
}

if err = tx.Commit(); err != nil {
return fmt.Errorf("failed to merge, parts: %v, err: %v", mergeParts, err)
}

_ = ddl.DropTemporaryTable(s, temporaryTableName, false)
return err
}
82 changes: 82 additions & 0 deletions clients/postgresql/postgresql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package postgresql

import (
"github.com/artie-labs/transfer/lib/optimization"

"github.com/artie-labs/transfer/clients/utils"

"github.com/artie-labs/transfer/lib/ptr"

"github.com/artie-labs/transfer/lib/config"
_ "github.com/lib/pq"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/db"
"github.com/artie-labs/transfer/lib/destination/types"
)

type Store struct {
configMap *types.DwhToTablesConfigMap
config config.Config

db.Store
}

func (s *Store) GetConfigMap() *types.DwhToTablesConfigMap {
if s == nil {
return nil
}

return s.configMap
}

func (s *Store) Label() constants.DestinationKind {
return constants.PostgreSQL
}

const (
describeNameCol = "column_name"
describeTypeCol = "data_type"
describeDescriptionCol = "description"
)

func (s *Store) getTableConfig(tableData *optimization.TableData) (*types.DwhTableConfig, error) {
describeQuery, err := describeTableQuery(describeArgs{
RawTableName: tableData.RawName(),
Schema: tableData.TopicConfig.Schema,
})

if err != nil {
return nil, err
}

return utils.GetTableConfig(utils.GetTableCfgArgs{
Dwh: s,
FqName: tableData.ToFqName(s.Label(), true, s.config.SharedDestinationConfig.UppercaseEscapedNames, ""),
ConfigMap: s.configMap,
Query: describeQuery,
ColumnNameLabel: describeNameCol,
ColumnTypeLabel: describeTypeCol,
ColumnDescLabel: describeDescriptionCol,

EmptyCommentValue: ptr.ToString("<nil>"),
DropDeletedColumns: tableData.TopicConfig.DropDeletedColumns,
})
}

func LoadPostgreSQL(cfg config.Config, _store *db.Store) *Store {
if _store != nil {
// Used for tests.
return &Store{
configMap: &types.DwhToTablesConfigMap{},
config: cfg,
Store: *_store,
}
}

return &Store{
configMap: &types.DwhToTablesConfigMap{},
config: cfg,
Store: db.Open("postgres", cfg.PostgreSQL.ConnectionString()),
}
}
43 changes: 43 additions & 0 deletions clients/postgresql/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package postgresql

import (
"fmt"
"strings"

"github.com/artie-labs/transfer/lib/config/constants"
)

type describeArgs struct {
RawTableName string
Schema string
}

func describeTableQuery(args describeArgs) (string, error) {
if strings.Contains(args.RawTableName, `"`) {
return "", fmt.Errorf("table name cannot contain double quotes")
}

// This query is a modified fork from: https://gist.github.com/alexanderlz/7302623
return fmt.Sprintf(`
SELECT
c.column_name::text,
CASE
WHEN c.data_type = 'numeric' THEN
'numeric(' || COALESCE(CAST(c.numeric_precision AS VARCHAR), '') || ',' || COALESCE(CAST(c.numeric_scale AS VARCHAR), '') || ')'
ELSE
c.data_type
END AS data_type,
c.%s,
d.description
FROM
information_schema.columns c
LEFT JOIN
pg_class c1 ON c.table_name=c1.relname
LEFT JOIN
pg_catalog.pg_namespace n ON c.table_schema=n.nspname AND c1.relnamespace=n.oid
LEFT JOIN
pg_catalog.pg_description d ON d.objsubid=c.ordinal_position AND d.objoid=c1.oid
WHERE
LOWER(c.table_name) = LOWER('%s') AND LOWER(c.table_schema) = LOWER('%s');
`, constants.StrPrecisionCol, args.RawTableName, args.Schema), nil
}
83 changes: 83 additions & 0 deletions clients/postgresql/staging.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package postgresql

import (
"fmt"
"strings"
"time"

"github.com/lib/pq"

"github.com/artie-labs/transfer/lib/config/constants"
"github.com/artie-labs/transfer/lib/destination/ddl"
"github.com/artie-labs/transfer/lib/destination/types"
"github.com/artie-labs/transfer/lib/optimization"
"github.com/artie-labs/transfer/lib/typing"
)

func (s *Store) prepareTempTable(tableData *optimization.TableData, tableConfig *types.DwhTableConfig, tempTableName string) error {
tempAlterTableArgs := ddl.AlterTableArgs{
Dwh: s,
Tc: tableConfig,
FqTableName: tempTableName,
CreateTable: true,
TemporaryTable: true,
ColumnOp: constants.Add,
UppercaseEscNames: &s.config.SharedDestinationConfig.UppercaseEscapedNames,
}

if err := ddl.AlterTable(tempAlterTableArgs, tableData.ReadOnlyInMemoryCols().GetColumns()...); err != nil {
return fmt.Errorf("failed to create temp table, err: %v", err)
}

expiryString := typing.ExpiresDate(time.Now().UTC().Add(ddl.TempTableTTL))
// Now add a comment to the temporary table.
if _, err := s.Exec(fmt.Sprintf(`COMMENT ON TABLE %s IS '%s';`, tempTableName, ddl.ExpiryComment(expiryString))); err != nil {
return fmt.Errorf("failed to add comment to table, tableName: %v, err: %v", tempTableName, err)
}

return s.loadTemporaryTable(tableData, tempTableName)
}

func (s *Store) loadTemporaryTable(tableData *optimization.TableData, newTableName string) error {
tx, err := s.Begin()
if err != nil {
return fmt.Errorf("failed to start tx, err: %w", err)
}

newTableNameParts := strings.Split(newTableName, ".")
if len(newTableNameParts) != 2 {
return fmt.Errorf("invalid table name, tableName: %v", newTableName)
}

columns := tableData.ReadOnlyInMemoryCols().GetColumnsToUpdate(s.config.SharedDestinationConfig.UppercaseEscapedNames, nil)
stmt, err := tx.Prepare(pq.CopyInSchema(newTableNameParts[0], newTableNameParts[1], columns...))
if err != nil {
return fmt.Errorf("failed to prepare table, err: %w", err)
}

additionalDateFmts := s.config.SharedTransferConfig.TypingSettings.AdditionalDateFormats
for _, value := range tableData.RowsData() {
var row []any
for _, col := range columns {
colKind, _ := tableData.ReadOnlyInMemoryCols().GetColumn(col)
castedValue, castErr := s.CastColValStaging(value[col], colKind, additionalDateFmts)
if castErr != nil {
return castErr
}

row = append(row, castedValue)
}

if _, err = stmt.Exec(row...); err != nil {
return fmt.Errorf("failed to copy row, err: %w", err)
}
}

// Close the statement to finish the COPY operation
if _, err = stmt.Exec(); err != nil {
return fmt.Errorf("failed to finalize COPY, err: %w", err)
}

// Commit the transaction
return tx.Commit()
}