@@ -4,77 +4,82 @@ import (
4
4
"sync"
5
5
)
6
6
7
- type indexedValue struct {
8
- value interface {}
9
- index int64
10
- }
11
-
12
- // NewCirque creates a FIFO parallel queue that runs a given processor function on each job, similar to a parallel Map.
7
+ // NewCirque creates a FIFO parallel queue that runs a given
8
+ // processor function on each job, similar to a parallel Map.
13
9
//
14
- // The method accepts a parallelism number, which the maximum number of jobs that are processed simultaneously,
15
- // and a processor function that takes a job as input and returns a indexedValue as output. The processor function must be safe
10
+ // The method accepts a parallelism number, which the maximum
11
+ // number of jobs that are processed simultaneously,
12
+ // and a processor function that takes an input and returns
13
+ // an output. The processor function must be safe
16
14
// to call from multiple goroutines.
17
15
//
18
- // It returns two channels, one into which inputs can be passed, and one from which outputs can be read.
19
- // Closing the input channel will close the output channel after processing is complete. Do not close the output channel yourself.
20
- func NewCirque (parallelism int64 , processor func (interface {}) interface {}) (chan <- interface {}, <- chan interface {}) {
21
- input := make (chan interface {})
22
- output := make (chan interface {})
16
+ // It returns two channels, one into which inputs can be passed,
17
+ // and one from which outputs can be read. Closing the input channel
18
+ // will close the output channel after processing is complete. Do not
19
+ // close the output channel yourself.
20
+ func NewCirque [I any , O any ](parallelism int64 , processor func (I ) O ) (chan <- I , <- chan O ) {
21
+ inputChannel := make (chan I )
22
+ outputChannel := make (chan O )
23
+
24
+ inputHolder := NewSyncMap [int64 , I ]()
25
+ outputHolder := NewSyncMap [int64 , O ]()
26
+
27
+ // let the output goroutine know every time an input is processed, so it
28
+ // can wake up and try to send outputs
29
+ processCompletionSignal := make (chan struct {})
30
+
31
+ // apply backpressure to make sure we're processing inputs only when outputs are
32
+ // actually being collected - otherwise we're going to fill up memory with processed
33
+ // jobs that aren't being taken out.
34
+ outputBackpressureSignal := make (chan struct {}, parallelism )
23
35
24
- processedJobs := make (chan indexedValue )
25
- semaphore := make (chan struct {}, parallelism )
26
36
go func () { // process inputs
27
- poolWaiter := sync.WaitGroup {}
28
- pool := make (chan indexedValue )
37
+ inflightInputs := sync.WaitGroup {}
38
+ inputPool := make (chan int64 )
29
39
30
40
// Start worker pool of specified size
31
- for workerID := int64 (0 ); workerID < parallelism ; workerID ++ {
32
- poolWaiter .Add (1 )
41
+ for n := int64 (0 ); n < parallelism ; n ++ {
42
+ inflightInputs .Add (1 )
33
43
go func () {
34
- for job := range pool {
35
- processedJobs <- indexedValue {
36
- value : processor (job . value ),
37
- index : job . index ,
38
- }
44
+ for index := range inputPool {
45
+ input , _ := inputHolder . Get ( index )
46
+ outputHolder . Set ( index , processor (input ))
47
+ inputHolder . Delete ( index )
48
+ processCompletionSignal <- struct {}{ }
39
49
}
40
- poolWaiter .Done ()
50
+ inflightInputs .Done ()
41
51
}()
42
52
}
43
53
44
54
index := int64 (0 )
45
- for job := range input {
46
- pool <- indexedValue {
47
- value : job ,
48
- index : index ,
49
- }
50
- index = index + 1
51
- semaphore <- struct {}{}
55
+ for input := range inputChannel {
56
+ inputHolder .Set (index , input )
57
+ inputPool <- index
58
+ index ++
59
+ outputBackpressureSignal <- struct {}{}
52
60
}
53
- close (pool )
61
+ close (inputPool )
54
62
55
- poolWaiter .Wait ()
56
- close (processedJobs )
63
+ inflightInputs .Wait ()
64
+ close (processCompletionSignal )
57
65
}()
58
66
59
67
go func () { // send outputs in order
60
68
nextIndex := int64 (0 )
61
- storedResults := map [int64 ]indexedValue {}
62
- for res := range processedJobs {
63
- storedResults [res .index ] = res
64
- canSend := true
65
- for canSend {
66
- if storedResult , ok := storedResults [nextIndex ]; ok {
67
- output <- storedResult .value
68
- delete (storedResults , storedResult .index )
69
- nextIndex = nextIndex + 1
70
- <- semaphore
69
+ for range processCompletionSignal {
70
+ for true {
71
+ if output , ok := outputHolder .Get (nextIndex ); ok {
72
+ outputChannel <- output
73
+ outputHolder .Delete (nextIndex )
74
+ nextIndex ++
75
+ <- outputBackpressureSignal
71
76
} else {
72
- canSend = false
77
+ break
73
78
}
74
79
}
75
80
}
76
- close (output )
81
+ close (outputChannel )
77
82
}()
78
83
79
- return input , output
84
+ return inputChannel , outputChannel
80
85
}
0 commit comments