Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schema applyを実装した #15

Merged
merged 1 commit into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go.work

### Custom ###
!.gitkeep
mise.toml
.go-version
.vscode
tmp
2 changes: 1 addition & 1 deletion cmd/protobq/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newCliApp() *cli.App {
},
},
Action: func(c *cli.Context) error {
err := internal.Apply(context.Background(), c.String("project-id"), c.String("dataset-id"))
err := internal.ApplySchemaFromFile(context.Background(), c.String("project-id"), c.String("dataset-id"), c.String("input"))
if err != nil {
return err
}
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ go 1.23

require (
cloud.google.com/go/bigquery v1.65.0
github.com/bufbuild/protocompile v0.14.1
github.com/google/go-cmp v0.6.0
github.com/google/martian/v3 v3.3.3
github.com/huandu/go-sqlbuilder v1.33.1
github.com/urfave/cli/v2 v2.27.5
google.golang.org/api v0.214.0
google.golang.org/protobuf v1.36.0
Expand All @@ -27,13 +31,13 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.14.1 // indirect
github.com/huandu/xstrings v1.4.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/pierrec/lz4/v4 v4.1.22 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.58.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.58.0 // indirect
Expand Down
174 changes: 16 additions & 158 deletions go.sum

Large diffs are not rendered by default.

17 changes: 0 additions & 17 deletions internal/apply.go

This file was deleted.

38 changes: 21 additions & 17 deletions internal/codegen.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,26 @@ var (
Minute: protogen.GoImportPath("time").Ident("Minute"),
}
protobqIdents = struct {
MaterializedView protogen.GoIdent
MaterializedViewOptions protogen.GoIdent
InsertDTO protogen.GoIdent
internal struct {
NewInsertDTOImpl protogen.GoIdent
BQField protogen.GoIdent
internal struct {
MaterializedView protogen.GoIdent
MaterializedViewOptions protogen.GoIdent
InsertDTO protogen.GoIdent
NewInsertDTOImpl protogen.GoIdent
BQField protogen.GoIdent
}
}{
MaterializedView: protogen.GoImportPath("github.com/averak/protobq").Ident("MaterializedView"),
MaterializedViewOptions: protogen.GoImportPath("github.com/averak/protobq").Ident("MaterializedViewOptions"),
InsertDTO: protogen.GoImportPath("github.com/averak/protobq").Ident("InsertDTO"),
internal: struct {
NewInsertDTOImpl protogen.GoIdent
BQField protogen.GoIdent
MaterializedView protogen.GoIdent
MaterializedViewOptions protogen.GoIdent
InsertDTO protogen.GoIdent
NewInsertDTOImpl protogen.GoIdent
BQField protogen.GoIdent
}{
NewInsertDTOImpl: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("NewInsertDTOImpl"),
BQField: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("BQField"),
MaterializedView: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("MaterializedView"),
MaterializedViewOptions: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("MaterializedViewOptions"),
InsertDTO: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("InsertDTO"),
NewInsertDTOImpl: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("NewInsertDTOImpl"),
BQField: protogen.GoImportPath("github.com/averak/protobq/internal").Ident("BQField"),
},
}
)
Expand Down Expand Up @@ -74,23 +77,24 @@ func (g CodeGenerator) Gen() error {
}
ext, _ := proto.GetExtension(msg.Desc.Options(), protobq.E_MaterializedView).(*protobq.MaterializedView)

gf.P("var _ ", protobqIdents.MaterializedView, " = (*", msg.GoIdent.GoName, ")(nil)")
gf.P("var _ ", protobqIdents.internal.MaterializedView, " = (*", msg.GoIdent.GoName, ")(nil)")
gf.P()

gf.P("func (mv *", msg.GoIdent.GoName, ") Name() string {")
gf.P(" return \"", msg.Desc.Name(), "\"")
gf.P("}")
gf.P()

gf.P("func (mv *", msg.GoIdent.GoName, ") Options() ", protobqIdents.MaterializedViewOptions, " {")
gf.P(" return ", protobqIdents.MaterializedViewOptions, "{")
gf.P("func (mv *", msg.GoIdent.GoName, ") Options() ", protobqIdents.internal.MaterializedViewOptions, " {")
gf.P(" return ", protobqIdents.internal.MaterializedViewOptions, "{")
gf.P(" BaseTable: \"", ext.GetBaseTable(), "\",")
gf.P(" EnableRefresh: ", ext.GetEnableRefresh(), ",")
gf.P(" RefreshInterval: ", ext.GetRefreshIntervalMinutes(), " * ", timeIdents.Minute, ",")
gf.P(" }")
gf.P("}")
gf.P()

gf.P("func (mv *", msg.GoIdent.GoName, ") InsertDTO() ", protobqIdents.InsertDTO, " {")
gf.P("func (mv *", msg.GoIdent.GoName, ") InsertDTO() ", protobqIdents.internal.InsertDTO, " {")
gf.P(" res := ", protobqIdents.internal.NewInsertDTOImpl, "(\"", ext.GetBaseTable(), "\", nil)")
for _, field := range msg.Fields {
g.generateAddField(gf, field, nil, "res", "mv")
Expand Down
10 changes: 5 additions & 5 deletions internal/protobuf/example/example.protobq.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

170 changes: 170 additions & 0 deletions internal/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package internal

import (
"fmt"
"strings"
"time"

"cloud.google.com/go/bigquery"
"github.com/averak/protobq/internal/protobuf/protobq"
"github.com/huandu/go-sqlbuilder"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

type MaterializedViewOptions struct {
BaseTable string
EnableRefresh bool
RefreshInterval time.Duration

// TODO: クラスタ化列、パーティション列を定義する。
}

type MaterializedView interface {
proto.Message

Name() string
Options() MaterializedViewOptions
InsertDTO() InsertDTO
}

type InsertDTO interface {
TableName() string
Value() map[string]any
}

type BQSchemaConverter struct {
datasetID string
raw MaterializedView
}

func NewBQSchemaConverter(datasetID string, mv MaterializedView) *BQSchemaConverter {
return &BQSchemaConverter{
datasetID: datasetID,
raw: mv,
}
}

func (c BQSchemaConverter) ApplyBaseTableSchema(td *bigquery.TableMetadata) error {
schema := make(bigquery.Schema, 0, c.raw.ProtoReflect().Descriptor().Fields().Len())
for i := range c.raw.ProtoReflect().Descriptor().Fields().Len() {
field := c.raw.ProtoReflect().Descriptor().Fields().Get(i)
fs, err := c.toBQFieldSchema(field)
if err != nil {
return err
}
schema = append(schema, fs)
}

for _, existing := range td.Schema {
if existing.Name == c.raw.Name() {
existing.Schema = schema
return nil
}
}
td.Schema = append(td.Schema, &bigquery.FieldSchema{
Name: c.raw.Name(),
Type: bigquery.RecordFieldType,
Required: false,
Schema: schema,
})
return nil
}

func (c BQSchemaConverter) MaterializedViewSchema() (*bigquery.TableMetadata, error) {
res := &bigquery.TableMetadata{
Name: c.raw.Name(),
MaterializedView: &bigquery.MaterializedViewDefinition{
Query: c.MaterializedViewDDL(),
EnableRefresh: c.raw.Options().EnableRefresh,
RefreshInterval: c.raw.Options().RefreshInterval,
},
// TODO: #7 TimePartitioning, Clustering 定義する
TimePartitioning: nil,
Clustering: nil,
}
return res, nil
}

func (c BQSchemaConverter) MaterializedViewDDL() string {
sb := sqlbuilder.NewSelectBuilder()

// top-level のフィールドのみ DDL に含めれば良い。
cols := make([]string, 0, c.raw.ProtoReflect().Descriptor().Fields().Len())
for i := range c.raw.ProtoReflect().Descriptor().Fields().Len() {
field := c.raw.ProtoReflect().Descriptor().Fields().Get(i)
ext, _ := proto.GetExtension(field.Options(), protobq.E_MaterializedViewField).(*protobq.MaterializedViewField)
if len(ext.GetOriginPath()) > 0 {
cols = append(cols, fmt.Sprintf("%s.%s", c.raw.Name(), strings.Join(ext.GetOriginPath(), ".")))
} else {
cols = append(cols, fmt.Sprintf("%s.%s", c.raw.Name(), field.Name()))
}
}
res, _ := sb.Select(cols...).
From(fmt.Sprintf("%s.%s", c.datasetID, c.raw.Options().BaseTable)).
Where(sb.IsNotNull(c.raw.Name())).
Build()
return res
}

func (c BQSchemaConverter) toBQFieldSchema(field protoreflect.FieldDescriptor) (*bigquery.FieldSchema, error) {
dt, err := c.toBQDataType(field)
if err != nil {
return nil, err
}

res := &bigquery.FieldSchema{
Name: string(field.Name()),
Type: dt,
Required: field.Cardinality() == protoreflect.Required,
Repeated: field.Cardinality() == protoreflect.Repeated,
Schema: nil,
}
if dt == bigquery.RecordFieldType {
for i := range field.Message().Fields().Len() {
nested, err := c.toBQFieldSchema(field.Message().Fields().Get(i))
if err != nil {
return nil, err
}
res.Schema = append(res.Schema, nested)
}
}
return res, nil
}

func (c BQSchemaConverter) toBQDataType(field protoreflect.FieldDescriptor) (bigquery.FieldType, error) {
// https://pkg.go.dev/google.golang.org/protobuf/reflect/protoreflect#Value
var res bigquery.FieldType
switch field.Kind() {
case protoreflect.BoolKind:
res = bigquery.BooleanFieldType
case protoreflect.Int32Kind, protoreflect.Sint32Kind, protoreflect.Sfixed32Kind:
res = bigquery.IntegerFieldType
case protoreflect.Int64Kind, protoreflect.Sint64Kind, protoreflect.Sfixed64Kind:
res = bigquery.IntegerFieldType
case protoreflect.Uint32Kind, protoreflect.Fixed32Kind:
res = bigquery.IntegerFieldType
case protoreflect.Uint64Kind, protoreflect.Fixed64Kind:
res = bigquery.IntegerFieldType
case protoreflect.FloatKind:
res = bigquery.FloatFieldType
case protoreflect.DoubleKind:
res = bigquery.FloatFieldType
case protoreflect.StringKind:
res = bigquery.StringFieldType
case protoreflect.BytesKind:
res = bigquery.BytesFieldType
case protoreflect.EnumKind:
res = bigquery.StringFieldType
case protoreflect.MessageKind, protoreflect.GroupKind:
switch field.Message().FullName() {
case "google.protobuf.Timestamp":
res = bigquery.TimestampFieldType
default:
res = bigquery.RecordFieldType
}
default:
return "", fmt.Errorf("unsupported field type: %s", field.Kind())
}
return res, nil
}
Loading
Loading