-
Notifications
You must be signed in to change notification settings - Fork 4
/
registry.go
122 lines (109 loc) · 2.44 KB
/
registry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package registry
import (
"github.com/hashicorp/consul/api"
"net/url"
"strconv"
"time"
)
var args []string
// RegistryPrefix represents the name of the meta field of config
const RegistryPrefix = "registry"
// New registry contoller interface
func New(dsn string, osArgs []string) (Registry, error) {
args = osArgs
url, err := url.Parse(dsn)
if err != nil {
return nil, err
}
client, err := api.NewClient(&api.Config{
Scheme: url.Scheme,
Address: url.Host,
Datacenter: url.Query().Get("dc"),
Token: url.Query().Get("token"),
})
if err != nil {
return nil, err
}
registry := registry{
client: client,
datacenter: url.Query().Get("dc"),
refreshInterval: 30 * time.Second,
bindChan: make(chan struct{}),
}
if interval := url.Query().Get("refresh_interval"); len(interval) != 0 {
if v, err := strconv.ParseInt(interval, 10, 64); err == nil && v > 0 {
registry.refreshInterval = time.Duration(v) * time.Second
}
}
if len(dsn) != 0 {
go registry.supervisor()
} else {
registry.refreshInterval = -1
}
return ®istry, nil
}
type registry struct {
client *api.Client
configs []config
refreshInterval time.Duration
datacenter string
bindChan chan struct{}
}
func (r *registry) KV() KV {
return &kv{client: r.client.KV()}
}
func (r *registry) Discovery() Discovery {
return &discovery{
agent: r.client.Agent(),
health: r.client.Health(),
catalog: r.client.Catalog(),
datacenter: r.datacenter,
}
}
func (r *registry) supervisor() {
var (
refresh = time.Tick(r.refreshInterval)
)
for {
select {
case <-refresh:
r.Refresh()
case <-r.bindChan:
r.Refresh()
}
}
}
func (r *registry) Refresh() {
var (
kv = r.KV()
keys []string
)
for _, config := range r.configs {
for _, item := range config.items {
keys = append(keys, item.key)
}
}
values := make(map[string]string, len(keys))
for _, key := range keys {
if v, err := kv.Get(key); err == nil {
values[key] = v
}
}
for _, config := range r.configs {
var updatedItemKeys []string
config.rawConfig.Lock()
for _, item := range config.items {
value, ok := values[item.key]
if !ok {
continue
}
if item.equal(value) {
continue
}
item.set(value)
updatedItemKeys = append(updatedItemKeys, item.path)
}
config.rawConfig.Unlock()
config.callOnUpdatedMethod(updatedItemKeys) // Call method "OnUpdate<variableName>" if exists
}
}