Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): flipt to flipt replication #2475

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
27 changes: 26 additions & 1 deletion internal/cmd/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
"go.flipt.io/flipt/internal/storage"
storagecache "go.flipt.io/flipt/internal/storage/cache"
"go.flipt.io/flipt/internal/storage/fs"
"go.flipt.io/flipt/internal/storage/fs/flipt"
storageoci "go.flipt.io/flipt/internal/storage/fs/oci"
fliptsql "go.flipt.io/flipt/internal/storage/sql"
"go.flipt.io/flipt/internal/storage/sql/mysql"
"go.flipt.io/flipt/internal/storage/sql/postgres"
"go.flipt.io/flipt/internal/storage/sql/sqlite"
evaluationrpc "go.flipt.io/flipt/rpc/flipt/evaluation"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/jaeger"
Expand All @@ -56,6 +58,7 @@
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
Expand Down Expand Up @@ -261,6 +264,28 @@
return nil, err
}

store, err = fs.NewStore(logger, source)
if err != nil {
return nil, err
}
case config.FliptStorageType:
opts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
}
dialCtx, dialCancel := context.WithTimeout(ctx, 5*time.Second)
defer dialCancel()

clientConn, err := grpc.DialContext(dialCtx, cfg.Storage.Flipt.Address, opts...)
if err != nil {
return nil, err
}

Check warning on line 282 in internal/cmd/grpc.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/grpc.go#L267-L282

Added lines #L267 - L282 were not covered by tests

source, err := flipt.NewSource(logger, ctx, evaluationrpc.NewDataServiceClient(clientConn))
if err != nil {
return nil, err
}

Check warning on line 287 in internal/cmd/grpc.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/grpc.go#L284-L287

Added lines #L284 - L287 were not covered by tests

store, err = fs.NewStore(logger, source)
if err != nil {
return nil, err
Expand Down Expand Up @@ -330,7 +355,7 @@
fliptsrv = fliptserver.New(logger, store)
metasrv = metadata.New(cfg, info)
evalsrv = evaluation.New(logger, store)
evalDataSrv = evaluationdata.New(logger, store)
evalDataSrv = evaluationdata.New(ctx, logger, store)

Check warning on line 358 in internal/cmd/grpc.go

View check run for this annotation

Codecov / codecov/patch

internal/cmd/grpc.go#L358

Added line #L358 was not covered by tests
healthsrv = health.NewServer()
)

Expand Down
34 changes: 29 additions & 5 deletions internal/config/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
GitStorageType = StorageType("git")
ObjectStorageType = StorageType("object")
OCIStorageType = StorageType("oci")
FliptStorageType = StorageType("flipt")
)

type ObjectSubStorageType string
Expand All @@ -38,14 +39,15 @@
Git *Git `json:"git,omitempty" mapstructure:"git,omitempty" yaml:"git,omitempty"`
Object *Object `json:"object,omitempty" mapstructure:"object,omitempty" yaml:"object,omitempty"`
OCI *OCI `json:"oci,omitempty" mapstructure:"oci,omitempty" yaml:"oci,omitempty"`
Flipt *Flipt `json:"flipt,omitempty" mapstructure:"flipt,omitempty" yaml:"flipt,omitempty"`
ReadOnly *bool `json:"readOnly,omitempty" mapstructure:"read_only,omitempty" yaml:"read_only,omitempty"`
}

func (c *StorageConfig) setDefaults(v *viper.Viper) error {
switch v.GetString("storage.type") {
case string(LocalStorageType):
switch StorageType(v.GetString("storage.type")) {
case LocalStorageType:
v.SetDefault("storage.local.path", ".")
case string(GitStorageType):
case GitStorageType:
v.SetDefault("storage.git.ref", "main")
v.SetDefault("storage.git.poll_interval", "30s")
v.SetDefault("storage.git.insecure_skip_tls", false)
Expand All @@ -54,15 +56,15 @@
v.GetString("storage.git.authentication.ssh.private_key_bytes") != "" {
v.SetDefault("storage.git.authentication.ssh.user", "git")
}
case string(ObjectStorageType):
case ObjectStorageType:
// keep this as a case statement in anticipation of
// more object types in the future
// nolint:gocritic
switch v.GetString("storage.object.type") {
case string(S3ObjectSubStorageType):
v.SetDefault("storage.object.s3.poll_interval", "1m")
}
case string(OCIStorageType):
case OCIStorageType:
v.SetDefault("storage.oci.poll_interval", "30s")

dir, err := DefaultBundleDir()
Expand All @@ -71,6 +73,8 @@
}

v.SetDefault("storage.oci.bundles_directory", dir)
case FliptStorageType:
v.SetDefault("storage.flipt.address", "localhost:9000")

Check warning on line 77 in internal/config/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/config/storage.go#L76-L77

Added lines #L76 - L77 were not covered by tests
default:
v.SetDefault("storage.type", "database")
}
Expand Down Expand Up @@ -115,6 +119,10 @@
if _, err := oci.ParseReference(c.OCI.Repository); err != nil {
return fmt.Errorf("validating OCI configuration: %w", err)
}
case FliptStorageType:
if err := c.Flipt.validate(); err != nil {
return err
}

Check warning on line 125 in internal/config/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/config/storage.go#L122-L125

Added lines #L122 - L125 were not covered by tests
}

// setting read only mode is only supported with database storage
Expand Down Expand Up @@ -279,6 +287,7 @@
Password string `json:"-" mapstructure:"password" yaml:"-"`
}

// DefaultBundleDir returns the location for storing OCI bundles of Flipt state
func DefaultBundleDir() (string, error) {
dir, err := Dir()
if err != nil {
Expand All @@ -292,3 +301,18 @@

return bundlesDir, nil
}

// Flipt configures another Flipt as a source for feature flag state
// In this mode, Flipt will subscribe to snapshots of state from the
// Flipt instance(s) reachable from the configured address.
type Flipt struct {
Address string `json:"address,omitempty" mapstructure:"address" yaml:"address,omitempty"`
}

func (f *Flipt) validate() error {
if f.Address == "" {
return errors.New("flipt address cannot be empty")
}

Check warning on line 315 in internal/config/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/config/storage.go#L312-L315

Added lines #L312 - L315 were not covered by tests

return nil

Check warning on line 317 in internal/config/storage.go

View check run for this annotation

Codecov / codecov/patch

internal/config/storage.go#L317

Added line #L317 was not covered by tests
}