Skip to content

Commit

Permalink
Task concurrency not being respected influxdata#23582
Browse files Browse the repository at this point in the history
  • Loading branch information
redaianer committed Nov 8, 2022
1 parent de247ba commit f15410e
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 49 deletions.
13 changes: 7 additions & 6 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,15 +451,16 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
query.QueryServiceBridge{AsyncQueryService: m.queryController},
)

executor, executorMetrics := executor.NewExecutor(
m.log.With(zap.String("service", "task-executor")),
exec, executorMetrics := executor.NewExecutor(
m.log.With(zap.String("service", "task-exec")),
query.QueryServiceBridge{AsyncQueryService: m.queryController},
ts.UserService,
combinedTaskService,
combinedTaskService,
executor.WithFlagger(m.flagger),
)
m.executor = executor
exec.SetLimitFunc(executor.ConcurrencyLimit(exec, fluxlang.DefaultService))
m.executor = exec
m.reg.MustRegister(executorMetrics.PrometheusCollectors()...)
schLogger := m.log.With(zap.String("service", "task-scheduler"))

Expand All @@ -470,7 +471,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
err error
)
sch, sm, err = scheduler.NewScheduler(
executor,
exec,
taskbackend.NewSchedulableTaskService(m.kvService),
scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) {
schLogger.Info(
Expand Down Expand Up @@ -499,7 +500,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
taskCoord := coordinator.NewCoordinator(
coordLogger,
sch,
executor)
exec)

taskSvc = middleware.New(combinedTaskService, taskCoord)
if err := taskbackend.TaskNotifyCoordinatorOfExisting(
Expand All @@ -508,7 +509,7 @@ func (m *Launcher) run(ctx context.Context, opts *InfluxdOpts) (err error) {
combinedTaskService,
taskCoord,
func(ctx context.Context, taskID platform2.ID, runID platform2.ID) error {
_, err := executor.ResumeCurrentRun(ctx, taskID, runID)
_, err := exec.ResumeCurrentRun(ctx, taskID, runID)
return err
},
coordLogger); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion influxql/query/internal/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 15 additions & 15 deletions pkg/tracing/wire/binary.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion storage/reads/datatypes/predicate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 23 additions & 23 deletions storage/reads/datatypes/storage_common.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion tsdb/internal/fieldsindex.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion v1/services/meta/internal/meta.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion v1/services/storage/source.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f15410e

Please sign in to comment.