Skip to content

Commit

Permalink
add recover model from checkpoint first if it exists, store is local
Browse files Browse the repository at this point in the history
  • Loading branch information
kitianFresh committed Jun 23, 2022
1 parent fde8461 commit 1d0091f
Show file tree
Hide file tree
Showing 13 changed files with 541 additions and 31 deletions.
31 changes: 28 additions & 3 deletions cmd/craned/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"flag"
"fmt"
"os"
"strings"

Expand All @@ -25,6 +26,7 @@ import (
predictionapi "github.com/gocrane/api/prediction/v1alpha1"

"github.com/gocrane/crane/cmd/craned/app/options"
"github.com/gocrane/crane/pkg/checkpoint"
"github.com/gocrane/crane/pkg/controller/analytics"
"github.com/gocrane/crane/pkg/controller/cnp"
"github.com/gocrane/crane/pkg/controller/ehpa"
Expand Down Expand Up @@ -108,7 +110,11 @@ func Run(ctx context.Context, opts *options.Options) error {
}
// initialize data sources and predictor
realtimeDataSources, histroyDataSources, _ := initDataSources(mgr, opts)
predictorMgr := initPredictorManager(opts, realtimeDataSources, histroyDataSources)
predictorMgr, err := initPredictorManager(opts, realtimeDataSources, histroyDataSources)
if err != nil {
klog.Error(err, "failed to init predictor mgr")
return err
}

initScheme()
initWebhooks(mgr, opts)
Expand Down Expand Up @@ -197,8 +203,27 @@ func initDataSources(mgr ctrl.Manager, opts *options.Options) (map[providers.Dat
return realtimeDataSources, historyDataSources, hybridDataSources
}

func initPredictorManager(opts *options.Options, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) predictor.Manager {
return predictor.NewManager(realtimeDataSources, historyDataSources, predictor.DefaultPredictorsConfig(opts.AlgorithmModelConfig))
func initPredictorManager(opts *options.Options, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) (predictor.Manager, error) {
cpStoreType := checkpoint.StoreType(opts.CheckpointerStore)
var checkpointer checkpoint.Checkpointer
var err error
if opts.EnableCheckpointer {
switch cpStoreType {
case checkpoint.StoreTypeLocal:
checkpointer, err = checkpoint.InitCheckpointer(checkpoint.StoreType(opts.CheckpointerStore), opts.CheckpointerLocalConfig)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("not supported checkpointer store type %v", cpStoreType)
}
}

ctx := predictor.CheckPointerContext{
Enable: opts.EnableCheckpointer,
Checkpointer: checkpointer,
}
return predictor.NewManager(realtimeDataSources, historyDataSources, predictor.DefaultPredictorsConfig(opts.AlgorithmModelConfig), ctx), nil
}

// initControllers setup controllers with manager
Expand Down
12 changes: 11 additions & 1 deletion cmd/craned/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/pflag"
componentbaseconfig "k8s.io/component-base/config"

"github.com/gocrane/crane/pkg/checkpoint"
"github.com/gocrane/crane/pkg/controller/ehpa"
"github.com/gocrane/crane/pkg/prediction/config"
"github.com/gocrane/crane/pkg/providers"
Expand Down Expand Up @@ -37,7 +38,10 @@ type Options struct {
DataSourceGrpcConfig providers.GrpcConfig

// AlgorithmModelConfig
AlgorithmModelConfig config.AlgorithmModelConfig
AlgorithmModelConfig config.AlgorithmModelConfig
EnableCheckpointer bool
CheckpointerStore string
CheckpointerLocalConfig checkpoint.LocalStoreConfig

// WebhookConfig
WebhookConfig webhooks.WebhookConfig
Expand Down Expand Up @@ -107,6 +111,12 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.DataSourceGrpcConfig.Address, "grpc-ds-address", "localhost:50051", "grpc data source server address")
flags.DurationVar(&o.DataSourceGrpcConfig.Timeout, "grpc-ds-timeout", time.Minute, "grpc timeout")
flags.DurationVar(&o.AlgorithmModelConfig.UpdateInterval, "model-update-interval", 12*time.Hour, "algorithm model update interval, now used for dsp model update interval")

flags.BoolVar(&o.EnableCheckpointer, "enable-checkpointer", false, "algorithm model checkpointer, if you want to do checkpoint, you can enable it")
flags.StringVar(&o.CheckpointerStore, "checkpointer-store", "local", "type of the checkpointer, different checkpointer has different storage type. default is local")
flags.StringVar(&o.CheckpointerLocalConfig.Root, "checkpointer-local-root", ".", "local checkpointer root path which checkpoint data stored in, make sure your app has permission to read/write")
flags.IntVar(&o.CheckpointerLocalConfig.MaxWorkers, "checkpointer-local-max-workers", 4, "local checkpointer max workers to do read/write")

flags.BoolVar(&o.WebhookConfig.Enabled, "webhook-enabled", true, "whether enable webhook or not, default to true")
flags.StringVar(&o.RecommendationConfigFile, "recommendation-config-file", "", "recommendation configuration file")
flags.StringSliceVar(&o.EhpaControllerConfig.PropagationConfig.LabelPrefixes, "ehpa-propagation-label-prefixes", []string{}, "propagate labels whose key has the prefix to hpa")
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.17

require (
github.com/go-echarts/go-echarts/v2 v2.2.4
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac
github.com/google/cadvisor v0.39.2
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
github.com/prometheus/client_golang v1.11.0
Expand Down Expand Up @@ -149,7 +148,6 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/warnings.v0 v0.1.1 // indirect
Expand All @@ -169,7 +167,8 @@ require (
require (
cloud.google.com/go v0.84.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/fsnotify/fsnotify v1.5.1
github.com/gocrane/api v0.5.0
github.com/json-iterator/go v1.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/robfig/cron/v3 v3.0.1
Expand All @@ -181,6 +180,7 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/tools v0.1.8 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/protobuf v1.27.1
)

replace (
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.1.0-rc.5 h1:QOAag7FoBaBYYHRqzqkhhd8fq5RTubvI4v3Ft/gDVVQ=
github.com/gobwas/ws v1.1.0-rc.5/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4 h1:vGDg3G6y661KAlhjf/8/r8JCjaIi6aV8szCP+MZRU3Y=
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac h1:lBKVVOA4del0Plj80PCE+nglxaJxaXanCv5N6a3laVY=
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.5.0 h1:hKPt1T8T/vBEtMyWhz976ZHG8w+Z4NuHpp5+eixcw1A=
github.com/gocrane/api v0.5.0/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
70 changes: 70 additions & 0 deletions pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package checkpoint

import (
"context"
"fmt"
"sync"
"time"

"github.com/gocrane/crane/pkg/internal"
"github.com/gocrane/crane/pkg/metricnaming"
)

// Checkpointer is used to do checkpoint for metric namer. this package is only responsible for executing store and load of checkpoint data.
// You can implement other checkpoint writer and reader backed by different storages such as localfs、s3、etcd(Custom Resource Definition)
// the caller decides when to do checkpoint, checkpoint frequency is depending on the caller.
// there are multiple ways to decide when to do checkpoint.
// 1. predictor checkpoints all metric namers together periodically by a independent routine. but this will not guarantee the checkpoint data is consistent with the latest updated model in memory
// 2. predictor checkpoints the metric namer each time after model is updating, so the checkpoint is always latest. for example, the percentile to do checkpoint after add sample for each metric namer.
// 3. application caller such as evpa triggers the metric namer to do checkpoint. delegate the trigger to application caller
type Checkpointer interface {
Start(stopCh <-chan struct{})
Writer
Reader
}

type Writer interface {
// store metricNamer checkpoints. each time call will override original checkpoint data of the same metric namer if it exists.
// each metric namer model checkpoint only store one replica.
// this is sync way, it block until the checkpoint stored operation finished
StoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error
// this is async way, it send the checkpoint to a channel and return immediately
AsyncStoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error
// close checkpointer, close the queue && wait until all requests pending in queue done
Flush()
}

type Reader interface {
// load metricNamer checkpoints
LoadMetricModelCheckpoint(ctx context.Context, namer metricnaming.MetricNamer) (*internal.MetricNamerModelCheckpoint, error)
}

type StoreType string

const (
StoreTypeLocal StoreType = "local"
StoreTypeK8s StoreType = "k8s"
)

type Factory func(cfg interface{}) (Checkpointer, error)

var (
checkpointFactorys = make(map[StoreType]Factory)
lock sync.Mutex
)

func RegisterFactory(storeType StoreType, factory Factory) {
lock.Lock()
defer lock.Unlock()
checkpointFactorys[storeType] = factory
}

func InitCheckpointer(storeType StoreType, cfg interface{}) (Checkpointer, error) {
lock.Lock()
defer lock.Unlock()
if factory, ok := checkpointFactorys[storeType]; ok {
return factory(cfg)
} else {
return nil, fmt.Errorf("not registered checkpoint store type %v", storeType)
}
}
3 changes: 3 additions & 0 deletions pkg/checkpoint/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package checkpoint

// todo: define the k8s crd for metricnamer model checkpoint
Loading

0 comments on commit 1d0091f

Please sign in to comment.