-
Notifications
You must be signed in to change notification settings - Fork 207
/
cleaner.go
83 lines (70 loc) · 2.12 KB
/
cleaner.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package rmq
import "math"
type Cleaner struct {
connection Connection
}
func NewCleaner(connection Connection) *Cleaner {
return &Cleaner{connection: connection}
}
// Clean cleans the connection of the cleaner. This is useful to make sure no
// deliveries get lost. The main use case is if your consumers get restarted
// there will be unacked deliveries assigned to the connection. Once the
// heartbeat of that connection dies the cleaner can recognize that and remove
// those unacked deliveries back to the ready list. If there was no error it
// returns the number of deliveries which have been returned from unacked lists
// to ready lists across all cleaned connections and queues.
func (cleaner *Cleaner) Clean() (returned int64, err error) {
connectionNames, err := cleaner.connection.getConnections()
if err != nil {
return 0, err
}
for _, connectionName := range connectionNames {
hijackedConnection := cleaner.connection.hijackConnection(connectionName)
switch err := hijackedConnection.checkHeartbeat(); err {
case nil: // active connection
continue
case ErrorNotFound:
n, err := cleanStaleConnection(hijackedConnection)
if err != nil {
return 0, err
}
returned += n
default:
return 0, err
}
}
return returned, nil
}
func cleanStaleConnection(staleConnection Connection) (returned int64, err error) {
queueNames, err := staleConnection.getConsumingQueues()
if err != nil {
return 0, err
}
for _, queueName := range queueNames {
queue, err := staleConnection.OpenQueue(queueName)
if err != nil {
return 0, err
}
n, err := cleanQueue(queue)
if err != nil {
return 0, err
}
returned += n
}
if err := staleConnection.closeStaleConnection(); err != nil {
return 0, err
}
// log.Printf("rmq cleaner cleaned connection %s", staleConnection)
return returned, nil
}
func cleanQueue(queue Queue) (returned int64, err error) {
returned, err = queue.ReturnUnacked(math.MaxInt64)
if err != nil {
return 0, err
}
if err := queue.closeInStaleConnection(); err != nil {
return 0, err
}
// log.Printf("rmq cleaner cleaned queue %s %d", queue, returned)
return returned, nil
}