-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
215 lines (184 loc) · 5.33 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package main
import (
"encoding/json"
"flag"
"fmt"
"github.com/GeoNet/impact"
"github.com/GeoNet/mseed"
"github.com/GeoNet/slink"
"github.com/crowdmob/goamz/aws"
"github.com/crowdmob/goamz/sqs"
"log"
"os"
"strings"
"time"
)
func main() {
var Q *sqs.Queue
// runtime settings
var verbose bool
flag.BoolVar(&verbose, "verbose", false, "make noise")
var dryrun bool
flag.BoolVar(&dryrun, "dry-run", false, "don't actually send the messages")
// streaming channel information
var config string
flag.StringVar(&config, "config", "impact.json", "provide a streams config file")
// amazon queue details
var region string
flag.StringVar(®ion, "region", "", "provide AWS region, overides env variable \"AWS_REGION\"")
var queue string
flag.StringVar(&queue, "queue", "", "send messages to the SQS queue, overides env variable \"AWS_QUEUE\"")
var key string
flag.StringVar(&key, "key", "", "AWS access key id, overrides env and credentials file (default profile)")
var secret string
flag.StringVar(&secret, "secret", "", "AWS secret key id, overrides env and credentials file (default profile)")
// seedlink options
var netdly int
flag.IntVar(&netdly, "netdly", 0, "provide network delay")
var netto int
flag.IntVar(&netto, "netto", 300, "provide network timeout")
var keepalive int
flag.IntVar(&keepalive, "keepalive", 0, "provide keep-alive")
var selectors string
flag.StringVar(&selectors, "selectors", "???", "provide channel selectors")
var streams string
flag.StringVar(&streams, "streams", "*_*", "provide streams")
// heartbeat flush interval
var flush time.Duration
flag.DurationVar(&flush, "flush", 300.0*time.Second, "how often to send heartbeat messages")
// noisy channel detection
var probation time.Duration
flag.DurationVar(&probation, "probation", 10.0*time.Minute, "noise probation window")
var level int
flag.IntVar(&level, "level", 2, "noise threshold level")
// problem sending messages
var resends int
flag.IntVar(&resends, "resends", 6, "how many times to try and resend a message")
var wait time.Duration
flag.DurationVar(&wait, "wait", 5*time.Second, "how long to wait between message resends")
flag.Parse()
if !dryrun {
if region == "" {
region = os.Getenv("AWS_IMPACT_REGION")
if region == "" {
log.Fatalf("unable to find region in environment or command line [AWS_IMPACT_REGION]")
}
}
if queue == "" {
queue = os.Getenv("AWS_IMPACT_QUEUE")
if queue == "" {
log.Fatalf("unable to find queue in environment or command line [AWS_IMPACT_QUEUE]")
}
}
// configure amazon ...
R := aws.GetRegion(region)
// fall through to env then credentials file
A, err := aws.GetAuth(key, secret, "", time.Now().Add(30*time.Minute))
if err != nil {
log.Fatalf("unable to get amazon auth: %s\n", err)
}
S := sqs.New(A, R)
Q, err = S.GetQueue(queue)
if err != nil {
log.Fatalf("unable to get amazon queue: %s [%s/%s]\n", err, queue, region)
}
}
// load json file
state := impact.LoadStreams(config)
if state == nil {
log.Println("unable to parse config file: ", config)
os.Exit(1)
}
// initial stream setup
for s := range state {
_, err := state[s].Init(s, probation, (int32)(level))
if err != nil {
log.Fatalf("unable to get initial state: %s\n", err)
}
}
// who to call ...
server := "localhost:18000"
if flag.NArg() > 0 {
server = flag.Arg(0)
}
// initial seedlink handle
slconn := slink.NewSLCD()
defer slink.FreeSLCD(slconn)
// seedlink settings
slconn.SetNetDly(netdly)
slconn.SetNetTo(netto)
slconn.SetKeepAlive(keepalive)
// conection
slconn.SetSLAddr(server)
defer slconn.Disconnect()
// configure streams selectors to recover
slconn.ParseStreamList(streams, selectors)
// make space for miniseed blocks
msr := mseed.NewMSRecord()
defer mseed.FreeMSRecord(msr)
// fixup stream code for messaging
replace := strings.NewReplacer("_", ".")
// output channel
result := make(chan impact.Message)
go func() {
for m := range result {
mm, err := json.Marshal(m)
if err != nil {
log.Printf("unable to marshal message: %s\n", err)
continue
}
if verbose {
fmt.Println(string(mm))
}
if !dryrun {
for n := 0; n < resends; n++ {
_, err := Q.SendMessage(string(mm))
if err != nil {
log.Printf("unable to send message [#%d/%d]: %s\n", n+1, resends, err)
log.Printf("sleeping %s\n", wait)
time.Sleep(wait)
}
break
}
}
}
}()
for {
// recover packet ...
p, rc := slconn.Collect()
if rc != slink.SLPACKET {
break
}
// just in case we're shutting down
if p.PacketType() != slink.SLDATA {
continue
}
// decode miniseed block
buf := p.GetMSRecord()
msr.Unpack(buf, 512, 1, 0)
// what to send
source := strings.TrimRight(msr.Network()+"."+msr.Station(), "\u0000")
// get lookup key
srcname := msr.SrcName(0)
stream, ok := state[srcname]
if ok == false {
continue
}
// recover amplitude samples
samples, err := msr.DataSamples()
if err != nil {
log.Printf("data sample problem! %s\n", err)
continue
}
// process each block into a message
message, err := stream.ProcessSamples(replace.Replace(source), srcname, msr.Starttime(), samples)
if err != nil {
log.Printf("data processing problem! %s\n", err)
continue
}
// should we send a message
if stream.Flush(flush, message.MMI) {
result <- message
}
}
}