From 61159bfe56b3e2b4d94d0f5a738babba4e3f9b10 Mon Sep 17 00:00:00 2001 From: kitianFresh <1549722424@qq.com> Date: Wed, 4 May 2022 20:26:38 +0800 Subject: [PATCH] add recover model from checkpoint first if it exists, store is local --- cmd/craned/app/manager.go | 31 ++- cmd/craned/app/options/options.go | 12 +- go.mod | 6 +- go.sum | 6 +- pkg/checkpoint/checkpoint.go | 70 +++++++ pkg/checkpoint/k8s.go | 3 + pkg/checkpoint/local.go | 195 ++++++++++++++++++ pkg/internal/checkpoint.go | 37 ++++ pkg/metricnaming/naming.go | 7 + pkg/prediction/dsp/prediction.go | 3 + pkg/prediction/percentile/aggregate_signal.go | 27 ++- pkg/prediction/percentile/prediction.go | 164 +++++++++++++-- pkg/predictor/predictor.go | 11 +- 13 files changed, 541 insertions(+), 31 deletions(-) create mode 100644 pkg/checkpoint/checkpoint.go create mode 100644 pkg/checkpoint/k8s.go create mode 100644 pkg/checkpoint/local.go create mode 100644 pkg/internal/checkpoint.go diff --git a/cmd/craned/app/manager.go b/cmd/craned/app/manager.go index a30cadf58..ae1d6c470 100644 --- a/cmd/craned/app/manager.go +++ b/cmd/craned/app/manager.go @@ -3,6 +3,7 @@ package app import ( "context" "flag" + "fmt" "os" "strings" @@ -25,6 +26,7 @@ import ( predictionapi "github.com/gocrane/api/prediction/v1alpha1" "github.com/gocrane/crane/cmd/craned/app/options" + "github.com/gocrane/crane/pkg/checkpoint" "github.com/gocrane/crane/pkg/controller/analytics" "github.com/gocrane/crane/pkg/controller/cnp" "github.com/gocrane/crane/pkg/controller/ehpa" @@ -108,7 +110,11 @@ func Run(ctx context.Context, opts *options.Options) error { } // initialize data sources and predictor realtimeDataSources, histroyDataSources, _ := initDataSources(mgr, opts) - predictorMgr := initPredictorManager(opts, realtimeDataSources, histroyDataSources) + predictorMgr, err := initPredictorManager(opts, realtimeDataSources, histroyDataSources) + if err != nil { + klog.Error(err, "failed to init predictor mgr") + return err + } initScheme() initWebhooks(mgr, opts) @@ -197,8 +203,27 @@ func initDataSources(mgr ctrl.Manager, opts *options.Options) (map[providers.Dat return realtimeDataSources, historyDataSources, hybridDataSources } -func initPredictorManager(opts *options.Options, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) predictor.Manager { - return predictor.NewManager(realtimeDataSources, historyDataSources, predictor.DefaultPredictorsConfig(opts.AlgorithmModelConfig)) +func initPredictorManager(opts *options.Options, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) (predictor.Manager, error) { + cpStoreType := checkpoint.StoreType(opts.CheckpointerStore) + var checkpointer checkpoint.Checkpointer + var err error + if opts.EnableCheckpointer { + switch cpStoreType { + case checkpoint.StoreTypeLocal: + checkpointer, err = checkpoint.InitCheckpointer(checkpoint.StoreType(opts.CheckpointerStore), opts.CheckpointerLocalConfig) + if err != nil { + return nil, err + } + default: + return nil, fmt.Errorf("not supported checkpointer store type %v", cpStoreType) + } + } + + ctx := predictor.CheckPointerContext{ + Enable: opts.EnableCheckpointer, + Checkpointer: checkpointer, + } + return predictor.NewManager(realtimeDataSources, historyDataSources, predictor.DefaultPredictorsConfig(opts.AlgorithmModelConfig), ctx), nil } // initControllers setup controllers with manager diff --git a/cmd/craned/app/options/options.go b/cmd/craned/app/options/options.go index faa1ae5c0..fccb558e6 100644 --- a/cmd/craned/app/options/options.go +++ b/cmd/craned/app/options/options.go @@ -6,6 +6,7 @@ import ( "github.com/spf13/pflag" componentbaseconfig "k8s.io/component-base/config" + "github.com/gocrane/crane/pkg/checkpoint" "github.com/gocrane/crane/pkg/controller/ehpa" "github.com/gocrane/crane/pkg/prediction/config" "github.com/gocrane/crane/pkg/providers" @@ -37,7 +38,10 @@ type Options struct { DataSourceGrpcConfig providers.GrpcConfig // AlgorithmModelConfig - AlgorithmModelConfig config.AlgorithmModelConfig + AlgorithmModelConfig config.AlgorithmModelConfig + EnableCheckpointer bool + CheckpointerStore string + CheckpointerLocalConfig checkpoint.LocalStoreConfig // WebhookConfig WebhookConfig webhooks.WebhookConfig @@ -107,6 +111,12 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) { flags.StringVar(&o.DataSourceGrpcConfig.Address, "grpc-ds-address", "localhost:50051", "grpc data source server address") flags.DurationVar(&o.DataSourceGrpcConfig.Timeout, "grpc-ds-timeout", time.Minute, "grpc timeout") flags.DurationVar(&o.AlgorithmModelConfig.UpdateInterval, "model-update-interval", 12*time.Hour, "algorithm model update interval, now used for dsp model update interval") + + flags.BoolVar(&o.EnableCheckpointer, "enable-checkpointer", false, "algorithm model checkpointer, if you want to do checkpoint, you can enable it") + flags.StringVar(&o.CheckpointerStore, "checkpointer-store", "local", "type of the checkpointer, different checkpointer has different storage type. default is local") + flags.StringVar(&o.CheckpointerLocalConfig.Root, "checkpointer-local-root", ".", "local checkpointer root path which checkpoint data stored in, make sure your app has permission to read/write") + flags.IntVar(&o.CheckpointerLocalConfig.MaxWorkers, "checkpointer-local-max-workers", 4, "local checkpointer max workers to do read/write") + flags.BoolVar(&o.WebhookConfig.Enabled, "webhook-enabled", true, "whether enable webhook or not, default to true") flags.StringVar(&o.RecommendationConfigFile, "recommendation-config-file", "", "recommendation configuration file") flags.StringSliceVar(&o.EhpaControllerConfig.PropagationConfig.LabelPrefixes, "ehpa-propagation-label-prefixes", []string{}, "propagate labels whose key has the prefix to hpa") diff --git a/go.mod b/go.mod index f18108b85..0ccf4dca2 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,6 @@ go 1.17 require ( github.com/go-echarts/go-echarts/v2 v2.2.4 - github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac github.com/google/cadvisor v0.39.2 github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12 github.com/prometheus/client_golang v1.11.0 @@ -149,7 +148,6 @@ require ( golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.27.1 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/warnings.v0 v0.1.1 // indirect @@ -169,7 +167,8 @@ require ( require ( cloud.google.com/go v0.84.0 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect - github.com/fsnotify/fsnotify v1.5.1 // indirect + github.com/fsnotify/fsnotify v1.5.1 + github.com/gocrane/api v0.5.0 github.com/json-iterator/go v1.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect github.com/robfig/cron/v3 v3.0.1 @@ -181,6 +180,7 @@ require ( golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/tools v0.1.8 // indirect google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect + google.golang.org/protobuf v1.27.1 ) replace ( diff --git a/go.sum b/go.sum index d7eaf7871..11513a5b1 100644 --- a/go.sum +++ b/go.sum @@ -310,10 +310,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= github.com/gobwas/ws v1.1.0-rc.5 h1:QOAag7FoBaBYYHRqzqkhhd8fq5RTubvI4v3Ft/gDVVQ= github.com/gobwas/ws v1.1.0-rc.5/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0= -github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4 h1:vGDg3G6y661KAlhjf/8/r8JCjaIi6aV8szCP+MZRU3Y= -github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk= -github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac h1:lBKVVOA4del0Plj80PCE+nglxaJxaXanCv5N6a3laVY= -github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk= +github.com/gocrane/api v0.5.0 h1:hKPt1T8T/vBEtMyWhz976ZHG8w+Z4NuHpp5+eixcw1A= +github.com/gocrane/api v0.5.0/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= diff --git a/pkg/checkpoint/checkpoint.go b/pkg/checkpoint/checkpoint.go new file mode 100644 index 000000000..1bc70be63 --- /dev/null +++ b/pkg/checkpoint/checkpoint.go @@ -0,0 +1,70 @@ +package checkpoint + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/gocrane/crane/pkg/internal" + "github.com/gocrane/crane/pkg/metricnaming" +) + +// Checkpointer is used to do checkpoint for metric namer. this package is only responsible for executing store and load of checkpoint data. +// You can implement other checkpoint writer and reader backed by different storages such as localfs、s3、etcd(Custom Resource Definition) +// the caller decides when to do checkpoint, checkpoint frequency is depending on the caller. +// there are multiple ways to decide when to do checkpoint. +// 1. predictor checkpoints all metric namers together periodically by a independent routine. but this will not guarantee the checkpoint data is consistent with the latest updated model in memory +// 2. predictor checkpoints the metric namer each time after model is updating, so the checkpoint is always latest. for example, the percentile to do checkpoint after add sample for each metric namer. +// 3. application caller such as evpa triggers the metric namer to do checkpoint. delegate the trigger to application caller +type Checkpointer interface { + Start(stopCh <-chan struct{}) + Writer + Reader +} + +type Writer interface { + // store metricNamer checkpoints. each time call will override original checkpoint data of the same metric namer if it exists. + // each metric namer model checkpoint only store one replica. + // this is sync way, it block until the checkpoint stored operation finished + StoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error + // this is async way, it send the checkpoint to a channel and return immediately + AsyncStoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error + // close checkpointer, close the queue && wait until all requests pending in queue done + Flush() +} + +type Reader interface { + // load metricNamer checkpoints + LoadMetricModelCheckpoint(ctx context.Context, namer metricnaming.MetricNamer) (*internal.MetricNamerModelCheckpoint, error) +} + +type StoreType string + +const ( + StoreTypeLocal StoreType = "local" + StoreTypeK8s StoreType = "k8s" +) + +type Factory func(cfg interface{}) (Checkpointer, error) + +var ( + checkpointFactorys = make(map[StoreType]Factory) + lock sync.Mutex +) + +func RegisterFactory(storeType StoreType, factory Factory) { + lock.Lock() + defer lock.Unlock() + checkpointFactorys[storeType] = factory +} + +func InitCheckpointer(storeType StoreType, cfg interface{}) (Checkpointer, error) { + lock.Lock() + defer lock.Unlock() + if factory, ok := checkpointFactorys[storeType]; ok { + return factory(cfg) + } else { + return nil, fmt.Errorf("not registered checkpoint store type %v", storeType) + } +} diff --git a/pkg/checkpoint/k8s.go b/pkg/checkpoint/k8s.go new file mode 100644 index 000000000..de7955071 --- /dev/null +++ b/pkg/checkpoint/k8s.go @@ -0,0 +1,3 @@ +package checkpoint + +// todo: define the k8s crd for metricnamer model checkpoint diff --git a/pkg/checkpoint/local.go b/pkg/checkpoint/local.go new file mode 100644 index 000000000..b4f4da9e1 --- /dev/null +++ b/pkg/checkpoint/local.go @@ -0,0 +1,195 @@ +package checkpoint + +import ( + "context" + "crypto/md5" + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "time" + + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/klog/v2" + + "github.com/gocrane/crane/pkg/internal" + "github.com/gocrane/crane/pkg/metricnaming" +) + +var _ Checkpointer = &Local{} + +// Local use local filesystem as checkpoint storage, so if you use Local, the craned need some persistent volumes such as cbs as storage to keep the states +type Local struct { + StoreRoot string + MaxWorkers int + checkpointStoreRequestsChan chan *checkpointStoreRequest + checkpointLoadRequestsChan chan *checkpointLoadRequest + internalReaderFinish chan struct{} + internalWriterFinish chan struct{} + globalStop <-chan struct{} +} + +type checkpointStoreRequest struct { + data *internal.CheckpointContext +} + +type checkpointLoadRequest struct { + namer metricnaming.MetricNamer + resp chan *checkpointLoadResponse +} + +type checkpointLoadResponse struct { + data *internal.MetricNamerModelCheckpoint + err error +} + +func (l *Local) Start(stopCh <-chan struct{}) { + l.globalStop = stopCh + writerRoutine := func() { + for request := range l.checkpointStoreRequestsChan { + err := l.write(request.data) + if err != nil { + klog.ErrorS(err, "Failed to store checkpoint %v", request.data.Namer.BuildUniqueKey()) + } + } + l.internalWriterFinish <- struct{}{} + } + + readerRoutine := func() { + for request := range l.checkpointLoadRequestsChan { + data, err := l.read(request.namer) + if err != nil { + klog.ErrorS(err, "Failed to load checkpoint %v", request.namer.BuildUniqueKey()) + } + select { + case request.resp <- &checkpointLoadResponse{data: data, err: err}: + } + } + l.internalReaderFinish <- struct{}{} + } + for i := 0; i < l.MaxWorkers; i++ { + go writerRoutine() + go readerRoutine() + } +} + +func SafeCloseStore(ch chan *checkpointStoreRequest) (justClosed bool) { + defer func() { + if recover() != nil { + justClosed = false + } + }() + + // assume ch != nil here. + close(ch) // panic if ch is closed + return true // <=> justClosed = true; return +} + +func SafeCloseLoad(ch chan *checkpointLoadRequest) (justClosed bool) { + defer func() { + if recover() != nil { + justClosed = false + } + }() + + // assume ch != nil here. + close(ch) // panic if ch is closed + return true // <=> justClosed = true; return +} + +// Flush flush all pending writer requests to disk, block until all finished +func (l *Local) Flush() { + SafeCloseLoad(l.checkpointLoadRequestsChan) + SafeCloseStore(l.checkpointStoreRequestsChan) + <-l.internalReaderFinish + <-l.internalWriterFinish + klog.V(4).Infof("Flush all checkpoint requests") +} + +func (l *Local) write(ctx *internal.CheckpointContext) error { + bytes, err := json.Marshal(ctx.Data) + if err != nil { + return err + } + err = ioutil.WriteFile(l.checkPointFileName(ctx.Namer), bytes, os.ModePerm) + if err != nil { + return err + } + return nil +} + +func (l *Local) read(namer metricnaming.MetricNamer) (*internal.MetricNamerModelCheckpoint, error) { + bytes, err := ioutil.ReadFile(l.checkPointFileName(namer)) + if err != nil { + return nil, err + } + data := &internal.MetricNamerModelCheckpoint{} + err = json.Unmarshal(bytes, data) + if err != nil { + return nil, err + } + return data, nil +} + +func (l *Local) checkPointFileName(namer metricnaming.MetricNamer) string { + // use md5 to shorten the namer key to avoid file name is too long. each unique metric namer is unique file name, so it is safe. + // but we can not recover the unique key from md5 vice versa(it do not impact now, because we do not need it), maybe we can find a better compress and decompress algorithms to do this. + key := namer.BuildUniqueKey() + encoded := fmt.Sprintf("%x", md5.Sum([]byte(key))) + return filepath.Join(l.StoreRoot, encoded) +} + +func (l *Local) StoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error { + return l.write(checkpoint) +} + +func (l *Local) AsyncStoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error { + defer runtime.HandleCrash() + select { + case <-ctx.Done(): + return ctx.Err() + case l.checkpointStoreRequestsChan <- &checkpointStoreRequest{data: checkpoint}: + return nil + } +} + +func (l *Local) LoadMetricModelCheckpoint(ctx context.Context, namer metricnaming.MetricNamer) (*internal.MetricNamerModelCheckpoint, error) { + respChan := make(chan *checkpointLoadResponse, 1) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case l.checkpointLoadRequestsChan <- &checkpointLoadRequest{namer: namer, resp: respChan}: + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case resp := <-respChan: + return resp.data, resp.err + } +} + +func NewLocal(config interface{}) (Checkpointer, error) { + cfg, ok := config.(*LocalStoreConfig) + if !ok { + return nil, fmt.Errorf("config type must be *LocalStoreConfig") + } + err := os.MkdirAll(cfg.Root, os.ModePerm) + return &Local{ + internalWriterFinish: make(chan struct{}, 1), + internalReaderFinish: make(chan struct{}, 1), + StoreRoot: cfg.Root, + MaxWorkers: cfg.MaxWorkers, + checkpointLoadRequestsChan: make(chan *checkpointLoadRequest, 32), + checkpointStoreRequestsChan: make(chan *checkpointStoreRequest, 32), + }, err +} + +type LocalStoreConfig struct { + Root string + MaxWorkers int +} + +func init() { + RegisterFactory(StoreTypeLocal, NewLocal) +} diff --git a/pkg/internal/checkpoint.go b/pkg/internal/checkpoint.go new file mode 100644 index 000000000..002e48fff --- /dev/null +++ b/pkg/internal/checkpoint.go @@ -0,0 +1,37 @@ +package internal + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/autoscaling.k8s.io/v1" + + "github.com/gocrane/crane/pkg/metricnaming" + "github.com/gocrane/crane/pkg/metricquery" +) + +const ( + SupportedCheckpointVersion = "v1" +) + +// todo: later to remove to api +type MetricNamerModelCheckpoint struct { + Metric *metricquery.Metric + // Last update time of the checkpoint data + LastUpdateTime metav1.Time + // FirstSampleStart of the model + FirstSampleStart metav1.Time + // LastSampleStart of the model + LastSampleStart metav1.Time + // SampleInterval of the model + SampleInterval metav1.Duration + // TotalSamplesCount of the model + TotalSamplesCount uint64 + // HistogramModel is the percentile histogram model, only support percentile algorithm model now + HistogramModel *vpa_types.HistogramCheckpoint + // Version is the checkpoint version, different versions maybe have different formats. + Version string +} + +type CheckpointContext struct { + Namer metricnaming.MetricNamer + Data *MetricNamerModelCheckpoint +} diff --git a/pkg/metricnaming/naming.go b/pkg/metricnaming/naming.go index a93165811..12a0c70f2 100644 --- a/pkg/metricnaming/naming.go +++ b/pkg/metricnaming/naming.go @@ -16,6 +16,9 @@ type MetricNamer interface { // Means the caller of this MetricNamer, different caller maybe use the same metric Caller() string + + // GetMetric return the metric of the namer + GetMetric() *metricquery.Metric } var _ MetricNamer = &GeneralMetricNamer{} @@ -25,6 +28,10 @@ type GeneralMetricNamer struct { CallerName string } +func (gmn *GeneralMetricNamer) GetMetric() *metricquery.Metric { + return gmn.Metric +} + func (gmn *GeneralMetricNamer) Caller() string { return gmn.CallerName } diff --git a/pkg/prediction/dsp/prediction.go b/pkg/prediction/dsp/prediction.go index 1aa86d587..780847ebf 100644 --- a/pkg/prediction/dsp/prediction.go +++ b/pkg/prediction/dsp/prediction.go @@ -193,6 +193,9 @@ func (p *periodicSignalPrediction) Run(stopCh <-chan struct{}) { p.queryRoutines.Delete(queryExpr) klog.V(4).InfoS("Prediction routine stopped.", "queryExpr", queryExpr) return + case <-stopCh: + klog.V(4).InfoS("Craned is stopped. dsp prediction routine stopped.", "queryExpr", queryExpr) + return case <-ticker.C: continue } diff --git a/pkg/prediction/percentile/aggregate_signal.go b/pkg/prediction/percentile/aggregate_signal.go index 8e3427b51..d655909b3 100644 --- a/pkg/prediction/percentile/aggregate_signal.go +++ b/pkg/prediction/percentile/aggregate_signal.go @@ -7,6 +7,7 @@ import ( vpa "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/recommender/util" "github.com/gocrane/crane/pkg/common" + "github.com/gocrane/crane/pkg/internal" ) type aggregateSignal struct { @@ -14,7 +15,7 @@ type aggregateSignal struct { firstSampleTime time.Time lastSampleTime time.Time minSampleWeight float64 - totalSamplesCount int + totalSamplesCount uint64 sampleInterval time.Duration creationTime time.Time labels []common.Label @@ -36,6 +37,13 @@ func (a *aggregateSignal) GetAggregationWindowLength() time.Duration { return time.Duration(a.totalSamplesCount) * a.sampleInterval } +func (a *aggregateSignal) Expired(now time.Time) bool { + if a.totalSamplesCount == 0 { + return now.Sub(a.creationTime) >= a.GetAggregationWindowLength() + } + return now.Sub(a.lastSampleTime) >= a.GetAggregationWindowLength() +} + func newAggregateSignal(c *internalConfig) *aggregateSignal { return &aggregateSignal{ histogram: vpa.NewHistogram(c.histogramOptions), @@ -44,3 +52,20 @@ func newAggregateSignal(c *internalConfig) *aggregateSignal { sampleInterval: c.sampleInterval, } } + +func newAggregateSignalFromCheckpoint(c *internalConfig, checkpoint *internal.MetricNamerModelCheckpoint) (*aggregateSignal, error) { + hist := vpa.NewHistogram(c.histogramOptions) + err := hist.LoadFromCheckpoint(checkpoint.HistogramModel) + if err != nil { + return nil, err + } + return &aggregateSignal{ + histogram: hist, + firstSampleTime: checkpoint.FirstSampleStart.Time, + lastSampleTime: checkpoint.LastSampleStart.Time, + sampleInterval: c.sampleInterval, + minSampleWeight: c.minSampleWeight, + creationTime: time.Now(), + totalSamplesCount: checkpoint.TotalSamplesCount, + }, nil +} diff --git a/pkg/prediction/percentile/prediction.go b/pkg/prediction/percentile/prediction.go index 651dfd242..ebb99ac0c 100644 --- a/pkg/prediction/percentile/prediction.go +++ b/pkg/prediction/percentile/prediction.go @@ -6,9 +6,12 @@ import ( "sync" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" + "github.com/gocrane/crane/pkg/checkpoint" "github.com/gocrane/crane/pkg/common" + "github.com/gocrane/crane/pkg/internal" "github.com/gocrane/crane/pkg/metricnaming" "github.com/gocrane/crane/pkg/prediction" "github.com/gocrane/crane/pkg/prediction/config" @@ -22,8 +25,10 @@ type percentilePrediction struct { prediction.GenericPrediction a aggregateSignals // record the query routine already started - queryRoutines sync.Map - stopChMap sync.Map + queryRoutines sync.Map + stopChMap sync.Map + enableCheckpointer bool + checkpointer checkpoint.Checkpointer } func (p *percentilePrediction) QueryPredictionStatus(_ context.Context, metricNamer metricnaming.MetricNamer) (prediction.Status, error) { @@ -200,19 +205,31 @@ func (p *percentilePrediction) process(namer metricnaming.MetricNamer, cfg *inte return p.getPredictedValuesFromSignals(queryExpr, signals, cfg), nil } -func NewPrediction(realtimeProvider providers.RealTime, historyProvider providers.History) prediction.Interface { +func NewPrediction(realtimeProvider providers.RealTime, historyProvider providers.History, enableCheckpointer bool, checkpointer checkpoint.Checkpointer) prediction.Interface { withCh, delCh := make(chan prediction.QueryExprWithCaller), make(chan prediction.QueryExprWithCaller) return &percentilePrediction{ - GenericPrediction: prediction.NewGenericPrediction(realtimeProvider, historyProvider, withCh, delCh), - a: newAggregateSignals(), - queryRoutines: sync.Map{}, - stopChMap: sync.Map{}, + GenericPrediction: prediction.NewGenericPrediction(realtimeProvider, historyProvider, withCh, delCh), + a: newAggregateSignals(), + queryRoutines: sync.Map{}, + stopChMap: sync.Map{}, + enableCheckpointer: enableCheckpointer, + checkpointer: checkpointer, } } func (p *percentilePrediction) Run(stopCh <-chan struct{}) { + if p.enableCheckpointer { + p.checkpointer.Start(stopCh) + } + var wg sync.WaitGroup + wg.Add(1) go func() { + defer wg.Done() for { + select { + case <-stopCh: + klog.V(4).InfoS("Craned is stopped. percentile prediction management routine stopped.") + } qc := <-p.WithCh // update if the query config updated, idempotent p.a.Add(qc) @@ -238,13 +255,27 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { var initError error switch c.initMode { case config.ModelInitModeLazyTraining: - p.initByRealTimeProvider(qc.MetricNamer) + // first try to recover from a checkpoint if checkpointer is enabled, if failed, then use initByRealTimeProvider directly + var err error + if p.enableCheckpointer { + err = p.initByCheckPoint(qc.MetricNamer) + if err != nil { + klog.ErrorS(err, "Failed to try to recover from checkpoint, init from real time provider directly") + p.initByRealTimeProvider(qc.MetricNamer) + } + } else { + p.initByRealTimeProvider(qc.MetricNamer) + } case config.ModelInitModeCheckpoint: - initError = p.initByCheckPoint(qc.MetricNamer) + if p.enableCheckpointer { + initError = p.initByCheckPoint(qc.MetricNamer) + } else { + initError = fmt.Errorf("the predictor checkpointer is not enabled") + } case config.ModelInitModeHistory: fallthrough default: - // blocking + // blocking for long query initError = p.initFromHistory(qc.MetricNamer) } @@ -255,7 +286,9 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { // note: same query, but different config means different series analysis, GetConfig always return same config. // this is our default policy, one metric only has one config at a time. + wg.Add(1) go func(namer metricnaming.MetricNamer) { + defer wg.Done() queryExpr := namer.BuildUniqueKey() p.queryRoutines.Store(queryExpr, struct{}{}) if c := p.a.GetConfig(queryExpr); c != nil { @@ -272,6 +305,9 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { p.queryRoutines.Delete(queryExpr) klog.V(4).InfoS("Prediction routine stopped.", "queryExpr", queryExpr) return + case <-stopCh: + klog.V(4).InfoS("Craned is stopped. percentile prediction routine stopped.", "queryExpr", queryExpr) + return case <-ticker.C: continue } @@ -281,7 +317,9 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { } }() + wg.Add(1) go func() { + defer wg.Done() for { qc := <-p.DelCh QueryExpr := qc.MetricNamer.BuildUniqueKey() @@ -296,12 +334,19 @@ func (p *percentilePrediction) Run(stopCh <-chan struct{}) { } } }(qc) + select { + case <-stopCh: + klog.V(4).InfoS("Craned is stopped. percentile prediction routine stopped.") + } } }() klog.Infof("predictor %v started", p.Name()) + wg.Wait() - <-stopCh + if p.enableCheckpointer { + p.checkpointer.Flush() + } klog.Infof("predictor %v stopped", p.Name()) @@ -335,14 +380,49 @@ func (p *percentilePrediction) initByRealTimeProvider(namer metricnaming.MetricN p.a.SetSignalWithStatus(queryExpr, keyAll, signal, prediction.StatusInitializing) } else { signals := map[string]*aggregateSignal{} + // just only one timeseries, we do not support one metric to mapping to multiple series without aggregated + key := prediction.AggregateSignalKey([]common.Label{}) + signal := newAggregateSignal(cfg) + signals[key] = signal p.a.SetSignalsWithStatus(queryExpr, signals, prediction.StatusInitializing) } } -// todo: -// nolint:unused -func (p *percentilePrediction) initByCheckPoint(_ metricnaming.MetricNamer) error { - return fmt.Errorf("checkpoint not supported") +func (p *percentilePrediction) initByCheckPoint(namer metricnaming.MetricNamer) error { + ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Minute) + defer cancelFunc() + checkpoint, err := p.checkpointer.LoadMetricModelCheckpoint(ctx, namer) + if err != nil { + return err + } + if checkpoint == nil { + return fmt.Errorf("no checkpoints restored for %v", namer.BuildUniqueKey()) + } + + queryExpr := namer.BuildUniqueKey() + cfg := p.a.GetConfig(queryExpr) + signal, err := newAggregateSignalFromCheckpoint(cfg, checkpoint) + if err != nil { + return err + } + if cfg.aggregated { + key := "__all__" + if signal.GetAggregationWindowLength() >= cfg.historyLength { + p.a.SetSignalWithStatus(queryExpr, key, signal, prediction.StatusReady) + } else { + p.a.SetSignalWithStatus(queryExpr, key, signal, prediction.StatusInitializing) + } + } else { + signals := map[string]*aggregateSignal{} + key := prediction.AggregateSignalKey([]common.Label{}) + signals[key] = signal + if signal.GetAggregationWindowLength() >= cfg.historyLength { + p.a.SetSignalsWithStatus(queryExpr, signals, prediction.StatusReady) + } else { + p.a.SetSignalsWithStatus(queryExpr, signals, prediction.StatusInitializing) + } + } + return fmt.Errorf("Do not support checkpoint now") } func (p *percentilePrediction) initFromHistory(namer metricnaming.MetricNamer) error { @@ -368,6 +448,7 @@ func (p *percentilePrediction) initFromHistory(namer metricnaming.MetricNamer) e p.a.SetSignal(queryExpr, keyAll, signal) } else { signals := map[string]*aggregateSignal{} + // in fact, now there must be only one series. do not support one metric mapping to many series without aggregated mode. for _, ts := range historyTimeSeriesList { if len(ts.Samples) < 1 { continue @@ -424,9 +505,23 @@ func (p *percentilePrediction) addSamples(namer metricnaming.MetricNamer) { if signal.GetAggregationWindowLength() >= c.historyLength { p.a.SetSignalStatus(queryExpr, keyAll, prediction.StatusReady) } - klog.V(6).InfoS("Sample added.", "sampleValue", sample.Value, "sampleTime", sampleTime, "queryExpr", queryExpr, "history", c.historyLength, "aggregationWindowLength", signal.GetAggregationWindowLength()) } + if p.enableCheckpointer && signal.GetAggregationWindowLength() >= c.historyLength { + data, err := NewMetricNamerCheckpointContext(signal, namer) + if err != nil { + klog.ErrorS(err, "Failed to get metric checkpoint", "queryExpr", queryExpr, "key", keyAll) + return + } + ctx := context.Background() + ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Minute) + defer cancelFunc() + err = p.checkpointer.AsyncStoreMetricModelCheckpoint(ctx, data, time.Now()) + if err != nil { + klog.ErrorS(err, "Failed to store checkpoint", "queryExpr", queryExpr, "key", keyAll) + return + } + } } else { // todo: find a way to remove the labels key, although we do not really use it now. for _, ts := range latestTimeSeriesList { @@ -456,8 +551,24 @@ func (p *percentilePrediction) addSamples(namer metricnaming.MetricNamer) { if signal.GetAggregationWindowLength() >= c.historyLength { p.a.SetSignalStatus(queryExpr, key, prediction.StatusReady) } - klog.V(6).InfoS("Sample added.", "sampleValue", sample.Value, "sampleTime", sampleTime, "queryExpr", queryExpr, "key", key, "history", c.historyLength, "aggregationWindowLength", signal.GetAggregationWindowLength()) + + if p.enableCheckpointer && signal.GetAggregationWindowLength() >= c.historyLength { + data, err := NewMetricNamerCheckpointContext(signal, namer) + if err != nil { + klog.ErrorS(err, "Failed to get metric checkpoint", "queryExpr", queryExpr, "key", key) + return + } + ctx := context.Background() + ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Minute) + err = p.checkpointer.AsyncStoreMetricModelCheckpoint(ctx, data, time.Now()) + if err != nil { + cancelFunc() + klog.ErrorS(err, "Failed to store checkpoint", "queryExpr", queryExpr, "key", key) + return + } + cancelFunc() + } } } } @@ -465,3 +576,22 @@ func (p *percentilePrediction) addSamples(namer metricnaming.MetricNamer) { func (p *percentilePrediction) Name() string { return "Percentile" } + +func NewMetricNamerCheckpointContext(signal *aggregateSignal, namer metricnaming.MetricNamer) (*internal.CheckpointContext, error) { + histCheckpoint, err := signal.histogram.SaveToChekpoint() + if err != nil { + return nil, err + } + return &internal.CheckpointContext{ + Data: &internal.MetricNamerModelCheckpoint{ + Metric: namer.GetMetric(), + FirstSampleStart: metav1.NewTime(signal.firstSampleTime), + LastSampleStart: metav1.NewTime(signal.lastSampleTime), + SampleInterval: metav1.Duration{Duration: signal.sampleInterval}, + TotalSamplesCount: signal.totalSamplesCount, + HistogramModel: histCheckpoint, + Version: internal.SupportedCheckpointVersion, + }, + Namer: namer, + }, nil +} diff --git a/pkg/predictor/predictor.go b/pkg/predictor/predictor.go index 7e85b41e7..2aafe7775 100644 --- a/pkg/predictor/predictor.go +++ b/pkg/predictor/predictor.go @@ -8,6 +8,7 @@ import ( predictionapi "github.com/gocrane/api/prediction/v1alpha1" + "github.com/gocrane/crane/pkg/checkpoint" "github.com/gocrane/crane/pkg/prediction" predconf "github.com/gocrane/crane/pkg/prediction/config" "github.com/gocrane/crane/pkg/prediction/dsp" @@ -66,8 +67,14 @@ type manager struct { historyDataProxys map[predictionapi.AlgorithmType]*providers.HistoryDataProxy } +type CheckPointerContext struct { + Enable bool + Checkpointer checkpoint.Checkpointer +} + func NewManager(realtimeProviders map[providers.DataSourceType]providers.RealTime, - historyProviders map[providers.DataSourceType]providers.History, predictorsConfig map[predictionapi.AlgorithmType]Config) Manager { + historyProviders map[providers.DataSourceType]providers.History, predictorsConfig map[predictionapi.AlgorithmType]Config, + checkpointerCtx CheckPointerContext) Manager { m := &manager{ predictors: make(map[predictionapi.AlgorithmType]prediction.Interface), @@ -104,7 +111,7 @@ func NewManager(realtimeProviders map[providers.DataSourceType]providers.RealTim switch algo { case predictionapi.AlgorithmTypePercentile: - pctPredictor := percentile.NewPrediction(algorithmRealTimeProxy, algorithmHistoryProxy) + pctPredictor := percentile.NewPrediction(algorithmRealTimeProxy, algorithmHistoryProxy, checkpointerCtx.Enable, checkpointerCtx.Checkpointer) m.predictors[algo] = pctPredictor m.historyDataProxys[algo] = algorithmHistoryProxy m.realTimeDataProxys[algo] = algorithmRealTimeProxy