From c401669b5c3e2b79a3319acc5075d405e3a17e73 Mon Sep 17 00:00:00 2001 From: Dilip Tadepalli Date: Fri, 15 Mar 2024 17:27:48 +0530 Subject: [PATCH] Add configurable workers for each controller, defaults to 10 Signed-off-by: Dilip Tadepalli --- cmd/kcp/help.go | 30 +++++ cmd/kcp/help_test.go | 74 ++++++++++++ cmd/kcp/kcp.go | 8 +- pkg/server/controllers.go | 212 +++++++++++++++++++++++++++++----- pkg/server/options/options.go | 5 + pkg/server/server.go | 25 +++- 6 files changed, 321 insertions(+), 33 deletions(-) create mode 100644 cmd/kcp/help_test.go diff --git a/cmd/kcp/help.go b/cmd/kcp/help.go index 2879d7064d7..f597f2a6101 100644 --- a/cmd/kcp/help.go +++ b/cmd/kcp/help.go @@ -19,6 +19,8 @@ package main import ( "fmt" "io" + "strconv" + "strings" "github.com/spf13/cobra" "github.com/spf13/pflag" @@ -67,3 +69,31 @@ func printMostImportantFlags(w io.Writer, fss cliflag.NamedFlagSets, cols int, v cliflag.PrintSections(w, filteredFFS, cols) } + +func parseControllerSettings(flagString string) map[string]int { + settingsMap := make(map[string]int) + + // Split the flag string based on commas + settings := strings.Split(flagString, ",") + + // Iterate over each setting + for _, setting := range settings { + // Split the setting into key and value based on '=' + parts := strings.Split(setting, "=") + if len(parts) == 2 { + key := strings.TrimSpace(parts[0]) + value := strings.TrimSpace(parts[1]) + num, err := strconv.Atoi(value) + if err != nil { + fmt.Println("Error:", err) + continue + } + settingsMap[key] = num + } else { + // Handle invalid format + fmt.Printf("Invalid format for setting: %s\n", setting) + } + } + + return settingsMap +} diff --git a/cmd/kcp/help_test.go b/cmd/kcp/help_test.go new file mode 100644 index 00000000000..431a94522b4 --- /dev/null +++ b/cmd/kcp/help_test.go @@ -0,0 +1,74 @@ +/* +Copyright 2022 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "reflect" + "testing" +) + +func Test_parseControllerSettings(t *testing.T) { + type args struct { + flagString string + } + + expected := map[string]int{ + "kcp-workspace": 2, + } + + expected2 := map[string]int{ + "kcp-workspace": 2, + "kcp-logicalcluster-deletion": 3, + } + + emptyMap := map[string]int{} + + tests := []struct { + name string + args args + want map[string]int + }{ + { + name: "Single Valid flag", + args: args{ + flagString: "kcp-workspace=2", + }, + want: expected, + }, + { + name: "Invalid argument", + args: args{ + flagString: "kcp-workspace", + }, + want: emptyMap, + }, + { + name: "Multiple Valid arguments", + args: args{ + flagString: "kcp-workspace=2,kcp-logicalcluster-deletion=3", + }, + want: expected2, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := parseControllerSettings(tt.args.flagString); !reflect.DeepEqual(got, tt.want) { + t.Errorf("parseControllerSettings() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/cmd/kcp/kcp.go b/cmd/kcp/kcp.go index b847c3b785b..6e634a9ab70 100644 --- a/cmd/kcp/kcp.go +++ b/cmd/kcp/kcp.go @@ -68,6 +68,7 @@ func main() { // manually extract root directory from flags first as it influence all other flags rootDir := ".kcp" + controllerThreadsStr := "" for i, f := range os.Args { if f == "--root-directory" { if i < len(os.Args)-1 { @@ -76,6 +77,10 @@ func main() { } else if strings.HasPrefix(f, "--root-directory=") { rootDir = strings.TrimPrefix(f, "--root-directory=") } + + if strings.HasPrefix(f, "--controller-threads=") { + controllerThreadsStr = strings.TrimPrefix(f, "--controller-threads=") + } } serverOptions := options.NewOptions(rootDir) @@ -136,7 +141,8 @@ func main() { } } - s, err := server.NewServer(completedConfig) + controllerThreads := parseControllerSettings(controllerThreadsStr) + s, err := server.NewServer(completedConfig, server.WithControllerThreads(controllerThreads)) if err != nil { return err } diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index 0995c7f0531..6a2986b402a 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -85,6 +85,8 @@ import ( kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" ) +const defaultWorkerThreads = 10 + type RunFunc func(ctx context.Context) type WaitFunc func(ctx context.Context, s *Server) error @@ -187,10 +189,15 @@ func (s *Server) installKubeNamespaceController(ctx context.Context, config *res corev1.FinalizerKubernetes, ) + numOfThreads, ok := s.controllerThreads[controllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: controllerName, Runner: func(ctx context.Context) { - c.Run(ctx, 10) + c.Run(ctx, numOfThreads) }, }) } @@ -213,10 +220,15 @@ func (s *Server) installKubeServiceAccountController(ctx context.Context, config return fmt.Errorf("error creating ServiceAccount controller: %w", err) } + numOfThreads, ok := s.controllerThreads[controllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: controllerName, Runner: func(ctx context.Context) { - c.Run(ctx, 1) + c.Run(ctx, numOfThreads) }, }) } @@ -305,10 +317,15 @@ func (s *Server) installRootCAConfigMapController(ctx context.Context, config *r return fmt.Errorf("error creating %s controller: %w", controllerName, err) } + numOfThreads, ok := s.controllerThreads[controllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: controllerName, Runner: func(ctx context.Context) { - c.Run(ctx, 2) + c.Run(ctx, numOfThreads) }, }) } @@ -339,10 +356,15 @@ func (s *Server) installTenancyLogicalClusterController(ctx context.Context, con s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), ) + numOfThreads, ok := s.controllerThreads[tenancylogicalcluster.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: tenancylogicalcluster.ControllerName, Runner: func(ctx context.Context) { - controller.Start(ctx, 10) + controller.Start(ctx, numOfThreads) }, }) } @@ -383,10 +405,15 @@ func (s *Server) installLogicalClusterDeletionController(ctx context.Context, co s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), ) + numOfThreads, ok := s.controllerThreads[logicalclusterdeletion.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: logicalclusterdeletion.ControllerName, Runner: func(ctx context.Context) { - logicalClusterDeletionController.Start(ctx, 10) + logicalClusterDeletionController.Start(ctx, numOfThreads) }, }) } @@ -425,10 +452,15 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con return err } + numOfThreads, ok := s.controllerThreads[workspace.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + if err := s.registerController(&controllerWrapper{ Name: workspace.ControllerName, Runner: func(ctx context.Context) { - workspaceController.Start(ctx, 2) + workspaceController.Start(ctx, numOfThreads) }, }); err != nil { return err @@ -452,10 +484,15 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con } } if workspaceShardController != nil { + numOfThreads, ok := s.controllerThreads[shard.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + if err := s.registerController(&controllerWrapper{ Name: shard.ControllerName, Runner: func(ctx context.Context) { - workspaceShardController.Start(ctx, 2) + workspaceShardController.Start(ctx, numOfThreads) }, }); err != nil { return err @@ -478,10 +515,15 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con return err } + numOfThreads, ok = s.controllerThreads[workspacetype.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + if err := s.registerController(&controllerWrapper{ Name: workspacetype.ControllerName, Runner: func(ctx context.Context) { - workspaceTypeController.Start(ctx, 2) + workspaceTypeController.Start(ctx, numOfThreads) }, }); err != nil { return err @@ -515,10 +557,15 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con return err } + numOfThreads, ok = s.controllerThreads[universalControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: universalControllerName, Runner: func(ctx context.Context) { - universalController.Start(ctx, 2) + universalController.Start(ctx, numOfThreads) }, }) } @@ -552,10 +599,15 @@ func (s *Server) installWorkspaceMountsScheduler(ctx context.Context, config *re return err } + numOfThreads, ok := s.controllerThreads[workspacemounts.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: workspacemounts.ControllerName, Runner: func(ctx context.Context) { - workspaceMountsController.Start(ctx, 2) + workspaceMountsController.Start(ctx, numOfThreads) }, }) } @@ -577,10 +629,15 @@ func (s *Server) installLogicalCluster(ctx context.Context, config *rest.Config) return err } + numOfThreads, ok := s.controllerThreads[logicalclusterctrl.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: logicalclusterctrl.ControllerName, Runner: func(ctx context.Context) { - logicalClusterController.Start(ctx, 2) + logicalClusterController.Start(ctx, numOfThreads) }, }) } @@ -616,6 +673,11 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C return err } + numOfThreads, ok := s.controllerThreads[apibinding.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + if err := s.registerController(&controllerWrapper{ Name: apibinding.ControllerName, Wait: func(ctx context.Context, s *Server) error { @@ -633,7 +695,7 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C }) }, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }); err != nil { return err @@ -663,10 +725,15 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C return err } + numOfThreads, ok = s.controllerThreads[permissionclaimlabel.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + if err := s.registerController(&controllerWrapper{ Name: permissionclaimlabel.ControllerName, Runner: func(ctx context.Context) { - permissionClaimLabelController.Start(ctx, 5) + permissionClaimLabelController.Start(ctx, numOfThreads) }, }); err != nil { return err @@ -695,10 +762,15 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C return err } + numOfThreads, ok = s.controllerThreads[permissionclaimlabel.ResourceControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + if err := s.registerController(&controllerWrapper{ Name: permissionclaimlabel.ResourceControllerName, Runner: func(ctx context.Context) { - permissionClaimLabelResourceController.Start(ctx, 2) + permissionClaimLabelResourceController.Start(ctx, numOfThreads) }, }); err != nil { return err @@ -721,10 +793,15 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C s.KcpSharedInformerFactory.Apis().V1alpha1().APIBindings(), ) + numOfThreads, ok = s.controllerThreads[apibindingdeletion.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: apibindingdeletion.ControllerName, Runner: func(ctx context.Context) { - apibindingDeletionController.Start(ctx, 10) + apibindingDeletionController.Start(ctx, numOfThreads) }, }) } @@ -784,13 +861,18 @@ func (s *Server) installAPIBinderController(ctx context.Context, config *rest.Co return err } + numOfThreads, ok := s.controllerThreads[initialization.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: initialization.ControllerName, Runner: func(ctx context.Context) { initializingWorkspacesKcpInformers.Start(ctx.Done()) initializingWorkspacesKcpInformers.WaitForCacheSync(ctx.Done()) - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -813,10 +895,15 @@ func (s *Server) installCRDCleanupController(ctx context.Context, config *rest.C return err } + numOfThreads, ok := s.controllerThreads[crdcleanup.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: crdcleanup.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -845,6 +932,11 @@ func (s *Server) installAPIExportController(ctx context.Context, config *rest.Co return err } + numOfThreads, ok := s.controllerThreads[apiexport.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: apiexport.ControllerName, Wait: func(ctx context.Context, s *Server) error { @@ -858,7 +950,7 @@ func (s *Server) installAPIExportController(ctx context.Context, config *rest.Co }) }, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -877,10 +969,15 @@ func (s *Server) installApisReplicateClusterRoleControllers(ctx context.Context, s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), ) + numOfThreads, ok := s.controllerThreads[apisreplicateclusterrole.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: apisreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -900,10 +997,15 @@ func (s *Server) installCoreReplicateClusterRoleControllers(ctx context.Context, s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), ) + numOfThreads, ok := s.controllerThreads[coresreplicateclusterrole.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: coresreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -922,10 +1024,15 @@ func (s *Server) installApisReplicateClusterRoleBindingControllers(ctx context.C s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), ) + numOfThreads, ok := s.controllerThreads[apisreplicateclusterrolebinding.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: apisreplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -944,10 +1051,15 @@ func (s *Server) installApisReplicateLogicalClusterControllers(ctx context.Conte s.KcpSharedInformerFactory.Apis().V1alpha1().APIExports(), ) + numOfThreads, ok := s.controllerThreads[apisreplicatelogicalcluster.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: apisreplicatelogicalcluster.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -966,10 +1078,15 @@ func (s *Server) installTenancyReplicateLogicalClusterControllers(ctx context.Co s.KcpSharedInformerFactory.Tenancy().V1alpha1().WorkspaceTypes(), ) + numOfThreads, ok := s.controllerThreads[tenancyreplicatelogicalcluster.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: tenancyreplicatelogicalcluster.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -989,10 +1106,15 @@ func (s *Server) installCoreReplicateClusterRoleBindingControllers(ctx context.C s.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(), ) + numOfThreads, ok := s.controllerThreads[corereplicateclusterrolebinding.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: corereplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -1011,10 +1133,15 @@ func (s *Server) installTenancyReplicateClusterRoleControllers(ctx context.Conte s.KubeSharedInformerFactory.Rbac().V1().ClusterRoleBindings(), ) + numOfThreads, ok := s.controllerThreads[tenancyreplicateclusterrole.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: tenancyreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -1033,10 +1160,15 @@ func (s *Server) installTenancyReplicateClusterRoleBindingControllers(ctx contex s.KubeSharedInformerFactory.Rbac().V1().ClusterRoles(), ) + numOfThreads, ok := s.controllerThreads[tenancyreplicateclusterrolebinding.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: tenancyreplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -1062,10 +1194,15 @@ func (s *Server) installAPIExportEndpointSliceController(ctx context.Context, co return err } + numOfThreads, ok := s.controllerThreads[apiexportendpointslice.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: apiexportendpointslice.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -1089,10 +1226,15 @@ func (s *Server) installPartitionSetController(ctx context.Context, config *rest return err } + numOfThreads, ok := s.controllerThreads[partitionset.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: partitionset.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -1113,10 +1255,15 @@ func (s *Server) installExtraAnnotationSyncController(ctx context.Context, confi return err } + numOfThreads, ok := s.controllerThreads[extraannotationsync.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + return s.registerController(&controllerWrapper{ Name: extraannotationsync.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }) } @@ -1154,10 +1301,15 @@ func (s *Server) installKubeQuotaController( return err } + numOfThreads, ok := s.controllerThreads[kubequota.ControllerName] + if !ok { + numOfThreads = defaultWorkerThreads + } + if err := s.registerController(&controllerWrapper{ Name: kubequota.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, numOfThreads) }, }); err != nil { return err diff --git a/pkg/server/options/options.go b/pkg/server/options/options.go index 6d3b76cb01d..2f4c7771115 100644 --- a/pkg/server/options/options.go +++ b/pkg/server/options/options.go @@ -197,6 +197,11 @@ Prefixing with - or + means to remove from the default set or add to the default fs.Var(kcpfeatures.NewFlagValue(), "feature-gates", ""+ "A set of key=value pairs that describe feature gates for alpha/experimental features. "+ "Options are:\n"+strings.Join(kcpfeatures.KnownFeatures(), "\n")) // hide kube-only gates + + // add flags that are filtered out from upstream, but overridden here with our own version + fs.Var(kcpfeatures.NewFlagValue(), "controller-settings", ""+ + "A set of key=value pairs that manage controller settings for alpha/experimental features. "+ + "Options are:\n"+strings.Join(kcpfeatures.KnownFeatures(), "\n")) // hide kube-only gates } func (o *CompletedOptions) Validate() []error { diff --git a/pkg/server/server.go b/pkg/server/server.go index 7906d64839d..17a605136fe 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -61,6 +61,9 @@ import ( const resyncPeriod = 10 * time.Hour +// Option configures optional parameters for the controller. +type Option func(s *Server) + type Server struct { CompletedConfig @@ -72,7 +75,20 @@ type Server struct { syncedCh chan struct{} rootPhase1FinishedCh chan struct{} - controllers map[string]*controllerWrapper + controllers map[string]*controllerWrapper + controllerThreads map[string]int // controller to num of worker threads map. +} + +// WithControllerThreads configures num of worker threads per controller. +/* +Example: + --controller-settings=WorkspaceControllerThreads=2,SecretControllerThreads=3,AnotherSetting=10" + the controller names are matched with the names during registring a controller. +*/ +func WithControllerThreads(csMap map[string]int) Option { + return func(c *Server) { + c.controllerThreads = csMap + } } func (s *Server) AddPostStartHook(name string, hook genericapiserver.PostStartHookFunc) error { @@ -83,7 +99,7 @@ func (s *Server) AddPreShutdownHook(name string, hook genericapiserver.PreShutdo return s.MiniAggregator.GenericAPIServer.AddPreShutdownHook(name, hook) } -func NewServer(c CompletedConfig) (*Server, error) { +func NewServer(c CompletedConfig, opts ...Option) (*Server, error) { s := &Server{ CompletedConfig: c, syncedCh: make(chan struct{}), @@ -91,6 +107,11 @@ func NewServer(c CompletedConfig) (*Server, error) { controllers: make(map[string]*controllerWrapper), } + // apply options + for _, opt := range opts { + opt(s) + } + notFoundHandler := notfoundhandler.New(c.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey) var err error s.ApiExtensions, err = c.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))