diff --git a/pkg/proxy/server.go b/pkg/proxy/server.go index 0c6fe645bd6..df052187227 100644 --- a/pkg/proxy/server.go +++ b/pkg/proxy/server.go @@ -118,6 +118,8 @@ func (s *Server) PrepareRun(ctx context.Context) (preparedServer, error) { return preparedServer{s}, nil } +const workerCount = 10 + func (s preparedServer) Run(ctx context.Context) error { logger := klog.FromContext(ctx).WithValues("component", "proxy") @@ -132,7 +134,7 @@ func (s preparedServer) Run(ctx context.Context) error { } // start index - go s.IndexController.Start(ctx, 2) + go s.IndexController.Start(ctx, workerCount) s.KcpSharedInformerFactory.Start(ctx.Done()) s.KcpSharedInformerFactory.WaitForCacheSync(ctx.Done()) diff --git a/pkg/server/controllers.go b/pkg/server/controllers.go index 0995c7f0531..4efa1f44f15 100644 --- a/pkg/server/controllers.go +++ b/pkg/server/controllers.go @@ -85,6 +85,9 @@ import ( kcpinformers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions" ) +// Default worker count for all controllers. +const workerCount = 10 + type RunFunc func(ctx context.Context) type WaitFunc func(ctx context.Context, s *Server) error @@ -146,7 +149,7 @@ func (s *Server) installClusterRoleAggregationController(ctx context.Context, co return s.registerController(&controllerWrapper{ Name: controllerName, Runner: func(ctx context.Context) { - c.Run(ctx, 5) + c.Run(ctx, workerCount) }, }) } @@ -190,7 +193,7 @@ func (s *Server) installKubeNamespaceController(ctx context.Context, config *res return s.registerController(&controllerWrapper{ Name: controllerName, Runner: func(ctx context.Context) { - c.Run(ctx, 10) + c.Run(ctx, workerCount) }, }) } @@ -216,7 +219,7 @@ func (s *Server) installKubeServiceAccountController(ctx context.Context, config return s.registerController(&controllerWrapper{ Name: controllerName, Runner: func(ctx context.Context) { - c.Run(ctx, 1) + c.Run(ctx, workerCount) }, }) } @@ -308,7 +311,7 @@ func (s *Server) installRootCAConfigMapController(ctx context.Context, config *r return s.registerController(&controllerWrapper{ Name: controllerName, Runner: func(ctx context.Context) { - c.Run(ctx, 2) + c.Run(ctx, workerCount) }, }) } @@ -342,7 +345,7 @@ func (s *Server) installTenancyLogicalClusterController(ctx context.Context, con return s.registerController(&controllerWrapper{ Name: tenancylogicalcluster.ControllerName, Runner: func(ctx context.Context) { - controller.Start(ctx, 10) + controller.Start(ctx, workerCount) }, }) } @@ -386,7 +389,7 @@ func (s *Server) installLogicalClusterDeletionController(ctx context.Context, co return s.registerController(&controllerWrapper{ Name: logicalclusterdeletion.ControllerName, Runner: func(ctx context.Context) { - logicalClusterDeletionController.Start(ctx, 10) + logicalClusterDeletionController.Start(ctx, workerCount) }, }) } @@ -428,7 +431,7 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con if err := s.registerController(&controllerWrapper{ Name: workspace.ControllerName, Runner: func(ctx context.Context) { - workspaceController.Start(ctx, 2) + workspaceController.Start(ctx, workerCount) }, }); err != nil { return err @@ -455,7 +458,7 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con if err := s.registerController(&controllerWrapper{ Name: shard.ControllerName, Runner: func(ctx context.Context) { - workspaceShardController.Start(ctx, 2) + workspaceShardController.Start(ctx, workerCount) }, }); err != nil { return err @@ -481,7 +484,7 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con if err := s.registerController(&controllerWrapper{ Name: workspacetype.ControllerName, Runner: func(ctx context.Context) { - workspaceTypeController.Start(ctx, 2) + workspaceTypeController.Start(ctx, workerCount) }, }); err != nil { return err @@ -518,7 +521,7 @@ func (s *Server) installWorkspaceScheduler(ctx context.Context, config *rest.Con return s.registerController(&controllerWrapper{ Name: universalControllerName, Runner: func(ctx context.Context) { - universalController.Start(ctx, 2) + universalController.Start(ctx, workerCount) }, }) } @@ -555,7 +558,7 @@ func (s *Server) installWorkspaceMountsScheduler(ctx context.Context, config *re return s.registerController(&controllerWrapper{ Name: workspacemounts.ControllerName, Runner: func(ctx context.Context) { - workspaceMountsController.Start(ctx, 2) + workspaceMountsController.Start(ctx, workerCount) }, }) } @@ -580,7 +583,7 @@ func (s *Server) installLogicalCluster(ctx context.Context, config *rest.Config) return s.registerController(&controllerWrapper{ Name: logicalclusterctrl.ControllerName, Runner: func(ctx context.Context) { - logicalClusterController.Start(ctx, 2) + logicalClusterController.Start(ctx, workerCount) }, }) } @@ -633,7 +636,7 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C }) }, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }); err != nil { return err @@ -666,7 +669,7 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C if err := s.registerController(&controllerWrapper{ Name: permissionclaimlabel.ControllerName, Runner: func(ctx context.Context) { - permissionClaimLabelController.Start(ctx, 5) + permissionClaimLabelController.Start(ctx, workerCount) }, }); err != nil { return err @@ -698,7 +701,7 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C if err := s.registerController(&controllerWrapper{ Name: permissionclaimlabel.ResourceControllerName, Runner: func(ctx context.Context) { - permissionClaimLabelResourceController.Start(ctx, 2) + permissionClaimLabelResourceController.Start(ctx, workerCount) }, }); err != nil { return err @@ -724,7 +727,7 @@ func (s *Server) installAPIBindingController(ctx context.Context, config *rest.C return s.registerController(&controllerWrapper{ Name: apibindingdeletion.ControllerName, Runner: func(ctx context.Context) { - apibindingDeletionController.Start(ctx, 10) + apibindingDeletionController.Start(ctx, workerCount) }, }) } @@ -789,8 +792,7 @@ func (s *Server) installAPIBinderController(ctx context.Context, config *rest.Co Runner: func(ctx context.Context) { initializingWorkspacesKcpInformers.Start(ctx.Done()) initializingWorkspacesKcpInformers.WaitForCacheSync(ctx.Done()) - - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -816,7 +818,7 @@ func (s *Server) installCRDCleanupController(ctx context.Context, config *rest.C return s.registerController(&controllerWrapper{ Name: crdcleanup.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -858,7 +860,7 @@ func (s *Server) installAPIExportController(ctx context.Context, config *rest.Co }) }, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -880,7 +882,7 @@ func (s *Server) installApisReplicateClusterRoleControllers(ctx context.Context, return s.registerController(&controllerWrapper{ Name: apisreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -903,7 +905,7 @@ func (s *Server) installCoreReplicateClusterRoleControllers(ctx context.Context, return s.registerController(&controllerWrapper{ Name: coresreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -925,7 +927,7 @@ func (s *Server) installApisReplicateClusterRoleBindingControllers(ctx context.C return s.registerController(&controllerWrapper{ Name: apisreplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -947,7 +949,7 @@ func (s *Server) installApisReplicateLogicalClusterControllers(ctx context.Conte return s.registerController(&controllerWrapper{ Name: apisreplicatelogicalcluster.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -969,7 +971,7 @@ func (s *Server) installTenancyReplicateLogicalClusterControllers(ctx context.Co return s.registerController(&controllerWrapper{ Name: tenancyreplicatelogicalcluster.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -992,7 +994,7 @@ func (s *Server) installCoreReplicateClusterRoleBindingControllers(ctx context.C return s.registerController(&controllerWrapper{ Name: corereplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -1014,7 +1016,7 @@ func (s *Server) installTenancyReplicateClusterRoleControllers(ctx context.Conte return s.registerController(&controllerWrapper{ Name: tenancyreplicateclusterrole.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -1036,7 +1038,7 @@ func (s *Server) installTenancyReplicateClusterRoleBindingControllers(ctx contex return s.registerController(&controllerWrapper{ Name: tenancyreplicateclusterrolebinding.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -1065,7 +1067,7 @@ func (s *Server) installAPIExportEndpointSliceController(ctx context.Context, co return s.registerController(&controllerWrapper{ Name: apiexportendpointslice.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -1092,7 +1094,7 @@ func (s *Server) installPartitionSetController(ctx context.Context, config *rest return s.registerController(&controllerWrapper{ Name: partitionset.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -1116,7 +1118,7 @@ func (s *Server) installExtraAnnotationSyncController(ctx context.Context, confi return s.registerController(&controllerWrapper{ Name: extraannotationsync.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) } @@ -1157,7 +1159,7 @@ func (s *Server) installKubeQuotaController( if err := s.registerController(&controllerWrapper{ Name: kubequota.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }); err != nil { return err @@ -1187,7 +1189,7 @@ func (s *Server) installApiExportIdentityController(ctx context.Context, config return s.registerController(&controllerWrapper{ Name: identitycache.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 1) + c.Start(ctx, workerCount) }, }) } @@ -1202,7 +1204,7 @@ func (s *Server) installReplicationController(ctx context.Context, config *rest. return s.registerController(&controllerWrapper{ Name: replication.ControllerName, Runner: func(ctx context.Context) { - controller.Start(ctx, 2) + controller.Start(ctx, workerCount) }, }) } @@ -1241,7 +1243,7 @@ func (s *Server) installGarbageCollectorController(ctx context.Context, config * return s.registerController(&controllerWrapper{ Name: garbagecollector.ControllerName, Runner: func(ctx context.Context) { - c.Start(ctx, 2) + c.Start(ctx, workerCount) }, }) }