-
Notifications
You must be signed in to change notification settings - Fork 3
/
redis.go
127 lines (108 loc) · 2.86 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package RedisStateStore
import (
"fmt"
"strings"
"github.com/go-redis/redis"
faasflow "github.com/s8sg/faas-flow"
)
type RedisStateStore struct {
KeyPath string
rds redis.UniversalClient
RetryCount int
}
// Update Compare and Update a valuer
type Incrementer interface {
Incr(key string, value int64) (int64, error)
}
func GetRedisStateStore(address, sentinelName string) (faasflow.StateStore, error) {
stateStore := &RedisStateStore{}
addrs := strings.Split(address, ",")
client := redis.NewUniversalClient(&redis.UniversalOptions{
MasterName: sentinelName,
Addrs: addrs,
})
err := client.Ping().Err()
if err != nil {
return nil, err
}
stateStore.rds = client
return stateStore, nil
}
// Configure
func (this *RedisStateStore) Configure(flowName string, requestId string) {
this.KeyPath = fmt.Sprintf("faasflow.%s.%s", flowName, requestId)
}
// Init (Called only once in a request)
func (this *RedisStateStore) Init() error {
return nil
}
// Update Compare and Update a valuer
func (this *RedisStateStore) Update(key string, oldValue string, newValue string) error {
key = this.KeyPath + "." + key
client := this.rds
err := client.Watch(func(tx *redis.Tx) error {
value, err := tx.Get(key).Result()
if err == redis.Nil {
err = fmt.Errorf("[%v] not exist", key)
return err
} else if err != nil {
err = fmt.Errorf("unexpect error %v", err)
return err
}
if value != oldValue {
err = fmt.Errorf("Old value doesn't match for key %s", key)
return err
}
_, err = tx.Pipelined(func(pl redis.Pipeliner) error {
pl.Set(key, newValue, 0)
return nil
})
return err
}, key)
return err
}
// Update Compare and Update a valuer
func (this *RedisStateStore) Incr(key string, value int64) (int64, error) {
key = this.KeyPath + "." + key
client := this.rds
return client.IncrBy(key, value).Result()
}
// Set Sets a value (override existing, or create one)
func (this *RedisStateStore) Set(key string, value string) error {
key = this.KeyPath + "." + key
client := this.rds
err := client.Set(key, value, 0).Err()
if err != nil {
return fmt.Errorf("failed to set key %s, error %v", key, err)
}
return nil
}
// Get Gets a value
func (this *RedisStateStore) Get(key string) (string, error) {
key = this.KeyPath + "." + key
client := this.rds
value, err := client.Get(key).Result()
if err == redis.Nil {
return "", fmt.Errorf("failed to get key %s, nil", key)
} else if err != nil {
return "", fmt.Errorf("failed to get key %s, %v", key, err)
}
return value, nil
}
// Cleanup (Called only once in a request)
func (this *RedisStateStore) Cleanup() error {
key := this.KeyPath + ".*"
client := this.rds
var rerr error
iter := client.Scan(0, key, 0).Iterator()
for iter.Next() {
err := client.Del(iter.Val()).Err()
if err != nil {
rerr = err
}
}
if err := iter.Err(); err != nil {
rerr = err
}
return rerr
}