Skip to content

Commit bbe9b32

Browse files
committed
feat(pool): add support for dynamically adjusting max concurrency
1 parent e2470f0 commit bbe9b32

File tree

4 files changed

+233
-7
lines changed

4 files changed

+233
-7
lines changed

README.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ Some common use cases include:
4242
- Panics recovery (panics are captured and returned as errors)
4343
- Subpools with a fraction of the parent pool's maximum number of workers
4444
- Blocking and non-blocking submission of tasks when the queue is full
45+
- Dynamic resizing of the pool
4546
- [API reference](https://pkg.go.dev/github.com/alitto/pond/v2)
4647

4748
## Installation
@@ -406,6 +407,40 @@ When a pool defines a queue size (bounded), you can also specify how to handle t
406407
pool := pond.NewPool(1, pond.WithQueueSize(10), pond.WithNonBlocking(true))
407408
```
408409

410+
### Resizing pools (v2)
411+
412+
You can dynamically change the maximum number of workers in a pool using the `Resize` method. This is useful when you need to adjust the pool's capacity based on runtime conditions.
413+
414+
``` go
415+
// Create a pool with 5 workers
416+
pool := pond.NewPool(5)
417+
418+
// Submit some tasks
419+
for i := 0; i < 20; i++ {
420+
pool.Submit(func() {
421+
// Do some work
422+
})
423+
}
424+
425+
// Increase the pool size to 10 workers
426+
pool.Resize(10)
427+
428+
// Submit more tasks that will use the increased capacity
429+
for i := 0; i < 20; i++ {
430+
pool.Submit(func() {
431+
// Do some work
432+
})
433+
}
434+
435+
// Decrease the pool size back to 5 workers
436+
pool.Resize(5)
437+
```
438+
439+
When resizing a pool:
440+
- The new maximum concurrency must be greater than 0
441+
- If you increase the size, new workers will be created as needed up to the new maximum
442+
- If you decrease the size, existing workers will continue running until they complete their current tasks, but no new workers will be created until the number of running workers is below the new maximum
443+
409444
### Metrics & monitoring
410445

411446
Each worker pool instance exposes useful metrics that can be queried through the following methods:

pool.go

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ const (
2020
)
2121

2222
var (
23-
ErrQueueFull = errors.New("queue is full")
24-
ErrQueueEmpty = errors.New("queue is empty")
25-
ErrPoolStopped = errors.New("pool stopped")
23+
ErrQueueFull = errors.New("queue is full")
24+
ErrQueueEmpty = errors.New("queue is empty")
25+
ErrPoolStopped = errors.New("pool stopped")
26+
ErrMaxConcurrencyReached = errors.New("max concurrency reached")
2627

2728
poolStoppedFuture = func() Task {
2829
future, resolve := future.NewFuture(context.Background())
@@ -73,6 +74,11 @@ type basePool interface {
7374

7475
// Returns true if the pool has been stopped or its context has been cancelled.
7576
Stopped() bool
77+
78+
// Resizes the pool by changing the maximum concurrency (number of workers) of the pool.
79+
// The new max concurrency must be greater than 0.
80+
// If the new max concurrency is less than the current number of running workers, the pool will continue to run with the new max concurrency.
81+
Resize(maxConcurrency int)
7682
}
7783

7884
// Represents a pool of goroutines that can execute tasks concurrently.
@@ -125,9 +131,43 @@ func (p *pool) Stopped() bool {
125131
}
126132

127133
func (p *pool) MaxConcurrency() int {
134+
p.mutex.Lock()
135+
defer p.mutex.Unlock()
136+
128137
return p.maxConcurrency
129138
}
130139

140+
func (p *pool) Resize(maxConcurrency int) {
141+
if maxConcurrency <= 0 {
142+
panic(errors.New("maxConcurrency must be greater than 0"))
143+
}
144+
145+
p.mutex.Lock()
146+
defer p.mutex.Unlock()
147+
148+
delta := maxConcurrency - p.maxConcurrency
149+
150+
p.maxConcurrency = maxConcurrency
151+
152+
if delta > 0 {
153+
// Increase the number of workers by delta if there are tasks in the queue
154+
queuedTasks := int(p.tasks.Len())
155+
156+
for i := 0; i < delta && i < queuedTasks; i++ {
157+
p.workerCount.Add(1)
158+
159+
if p.parent == nil {
160+
// Launch a new worker
161+
p.workerWaitGroup.Add(1)
162+
go p.worker(nil)
163+
} else {
164+
// Submit a task to the parent pool
165+
p.subpoolSubmit(nil)
166+
}
167+
}
168+
}
169+
}
170+
131171
func (p *pool) QueueSize() int {
132172
return p.queueSize
133173
}
@@ -165,9 +205,11 @@ func (p *pool) worker(task any) {
165205

166206
var readTaskErr, err error
167207
for {
168-
_, err = invokeTask[any](task)
208+
if task != nil {
209+
_, err = invokeTask[any](task)
169210

170-
p.updateMetrics(err)
211+
p.updateMetrics(err)
212+
}
171213

172214
task, readTaskErr = p.readTask()
173215

@@ -304,9 +346,11 @@ func (p *pool) subpoolSubmit(task any) error {
304346
return p.parent.submit(func() (output any, err error) {
305347
defer p.workerWaitGroup.Done()
306348

307-
output, err = invokeTask[any](task)
349+
if task != nil {
350+
output, err = invokeTask[any](task)
308351

309-
p.updateMetrics(err)
352+
p.updateMetrics(err)
353+
}
310354

311355
// Attempt to submit the next task to the parent pool
312356
if task, err := p.readTask(); err == nil {
@@ -338,6 +382,15 @@ func (p *pool) readTask() (task any, err error) {
338382
return
339383
}
340384

385+
if p.maxConcurrency > 0 && int(p.workerCount.Load()) > p.maxConcurrency {
386+
// Max concurrency reached, kill the worker
387+
p.workerCount.Add(-1)
388+
p.mutex.Unlock()
389+
390+
err = ErrMaxConcurrencyReached
391+
return
392+
}
393+
341394
task, _ = p.tasks.Read()
342395

343396
p.mutex.Unlock()

pool_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,3 +274,68 @@ func TestPoolWithQueueSizeAndNonBlocking(t *testing.T) {
274274

275275
pool.Stop().Wait()
276276
}
277+
278+
func TestPoolResize(t *testing.T) {
279+
280+
pool := NewPool(1, WithQueueSize(10))
281+
282+
assert.Equal(t, 1, pool.MaxConcurrency())
283+
284+
taskStarted := make(chan struct{}, 10)
285+
taskWait := make(chan struct{}, 10)
286+
287+
// Submit 10 tasks
288+
for i := 0; i < 10; i++ {
289+
pool.Submit(func() {
290+
<-taskStarted
291+
<-taskWait
292+
})
293+
}
294+
295+
// Unblock 3 tasks
296+
for i := 0; i < 3; i++ {
297+
taskStarted <- struct{}{}
298+
}
299+
300+
// Verify only 1 task is running and 9 are waiting
301+
time.Sleep(10 * time.Millisecond)
302+
assert.Equal(t, uint64(9), pool.WaitingTasks())
303+
assert.Equal(t, int64(1), pool.RunningWorkers())
304+
305+
// Increase max concurrency to 3
306+
pool.Resize(3)
307+
assert.Equal(t, 3, pool.MaxConcurrency())
308+
309+
// Unblock 3 more tasks
310+
for i := 0; i < 3; i++ {
311+
taskStarted <- struct{}{}
312+
}
313+
314+
// Verify 3 tasks are running and 7 are waiting
315+
time.Sleep(10 * time.Millisecond)
316+
assert.Equal(t, uint64(7), pool.WaitingTasks())
317+
assert.Equal(t, int64(3), pool.RunningWorkers())
318+
319+
// Decrease max concurrency to 1
320+
pool.Resize(2)
321+
assert.Equal(t, 2, pool.MaxConcurrency())
322+
323+
// Complete the 3 running tasks
324+
for i := 0; i < 3; i++ {
325+
taskWait <- struct{}{}
326+
}
327+
328+
// Unblock all remaining tasks
329+
for i := 0; i < 4; i++ {
330+
taskStarted <- struct{}{}
331+
}
332+
333+
// Ensure 2 tasks are running and 5 are waiting
334+
time.Sleep(10 * time.Millisecond)
335+
assert.Equal(t, uint64(5), pool.WaitingTasks())
336+
assert.Equal(t, int64(2), pool.RunningWorkers())
337+
338+
close(taskWait)
339+
340+
pool.Stop().Wait()
341+
}

subpool_test.go

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,3 +334,76 @@ func TestSubpoolWithQueueSizeOverride(t *testing.T) {
334334
subpool.StopAndWait()
335335
pool.StopAndWait()
336336
}
337+
338+
func TestSubpoolResize(t *testing.T) {
339+
340+
parentPool := NewPool(10, WithQueueSize(10))
341+
342+
pool := parentPool.NewSubpool(1)
343+
344+
assert.Equal(t, 1, pool.MaxConcurrency())
345+
assert.Equal(t, 10, parentPool.MaxConcurrency())
346+
347+
taskStarted := make(chan struct{}, 10)
348+
taskWait := make(chan struct{}, 10)
349+
350+
// Submit 10 tasks
351+
for i := 0; i < 10; i++ {
352+
pool.Submit(func() {
353+
<-taskStarted
354+
<-taskWait
355+
})
356+
}
357+
358+
// Unblock 3 tasks
359+
for i := 0; i < 3; i++ {
360+
taskStarted <- struct{}{}
361+
}
362+
363+
// Verify only 1 task is running and 9 are waiting
364+
time.Sleep(10 * time.Millisecond)
365+
assert.Equal(t, uint64(9), pool.WaitingTasks())
366+
assert.Equal(t, int64(1), pool.RunningWorkers())
367+
assert.Equal(t, int64(1), parentPool.RunningWorkers())
368+
369+
// Increase max concurrency to 3
370+
pool.Resize(3)
371+
assert.Equal(t, 3, pool.MaxConcurrency())
372+
assert.Equal(t, 10, parentPool.MaxConcurrency())
373+
374+
// Unblock 3 more tasks
375+
for i := 0; i < 3; i++ {
376+
taskStarted <- struct{}{}
377+
}
378+
379+
// Verify 3 tasks are running and 7 are waiting
380+
time.Sleep(10 * time.Millisecond)
381+
assert.Equal(t, uint64(7), pool.WaitingTasks())
382+
assert.Equal(t, int64(3), pool.RunningWorkers())
383+
assert.Equal(t, int64(3), parentPool.RunningWorkers())
384+
385+
// Decrease max concurrency to 1
386+
pool.Resize(2)
387+
assert.Equal(t, 2, pool.MaxConcurrency())
388+
assert.Equal(t, 10, parentPool.MaxConcurrency())
389+
390+
// Complete the 3 running tasks
391+
for i := 0; i < 3; i++ {
392+
taskWait <- struct{}{}
393+
}
394+
395+
// Unblock all remaining tasks
396+
for i := 0; i < 4; i++ {
397+
taskStarted <- struct{}{}
398+
}
399+
400+
// Ensure 2 tasks are running and 5 are waiting
401+
time.Sleep(10 * time.Millisecond)
402+
assert.Equal(t, uint64(5), pool.WaitingTasks())
403+
assert.Equal(t, int64(2), pool.RunningWorkers())
404+
assert.Equal(t, int64(2), parentPool.RunningWorkers())
405+
406+
close(taskWait)
407+
408+
pool.Stop().Wait()
409+
}

0 commit comments

Comments
 (0)