-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
105 lines (94 loc) · 2.73 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package main
import (
common "agentsc/data-proxy/common"
"agentsc/data-proxy/process"
sink "agentsc/data-proxy/sink"
source "agentsc/data-proxy/source"
"flag"
"fmt"
"log"
"net/http"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
func main() {
// read flag
var confile string
var debugMode bool
flag.StringVar(&confile, "c", "", "配置文件")
flag.BoolVar(&debugMode, "debug", false, "开启debug日志")
flag.Parse()
if confile == "" {
log.Fatal("请提供配置文件")
}
// read conf
config := common.GetConfig(confile)
common.Config = config
common.DebugMod = debugMode
common.SetDefaultConfigs(&common.Config)
// open channel
var chanBuff int = 100
if config.ChanBuff > chanBuff {
chanBuff = config.ChanBuff
}
common.PostProcessChan = make(chan *common.KeyMetricsMsg, chanBuff)
defer close(common.PostProcessChan)
common.PreProcessChan = make(chan *common.KeyMetricsMsg, chanBuff)
defer close(common.PreProcessChan)
// start sink goroutine
switch config.Sink.Type {
case "vm":
vmConf := config.VMConf[config.Sink.Id]
log.Printf("Sink: victoriametrics, %v", vmConf.ServiceUrl)
go sink.SinkToVM(vmConf)
case "pulsar":
pulsarConf := config.PulsarConf[config.Sink.Id]
log.Printf("Sink: pulsar, %v", pulsarConf.ServiceUrl)
go sink.SinkToPulsar(pulsarConf)
case "kafka":
kafkaConf := config.KafkaConf[config.Sink.Id]
log.Printf("Sink: kafka, %v", kafkaConf.Brokers)
go sink.SinkToKafka(kafkaConf)
default:
log.Fatalf("Error: invalid sink '%s'", config.Sink.Id)
}
// start process goroutine
var processor common.Processor
switch config.Process.Id {
case "", "empty":
log.Printf("Process: empty")
processor = process.EmptyProcess{}
case "label":
log.Printf("Process: label")
processor = process.LabelProcess{}
}
go processor.StartProcess()
// start source goroutine
switch config.Source.Type {
case "pulsar":
pulsarConf := config.PulsarConf[config.Source.Id]
log.Printf("Source: pulsar, %v", pulsarConf.ServiceUrl)
go source.ConsumePulsar(pulsarConf)
case "kafka":
kafkaConf := config.KafkaConf[config.Source.Id]
log.Printf("Source: kafka, %v", kafkaConf.Brokers)
go source.ConsumeKafka(kafkaConf)
case "":
log.Print("Warn: No source provided.")
default:
log.Fatalf("Error: invalid source '%s'", config.Source.Id)
}
// start remote api
for key,api := range config.RemoteApi {
switch key {
case "prometheus":
if api.Enabled {
http.HandleFunc("/write", source.PromRemoteWrite)
http.HandleFunc("/write/prometheus", source.PromRemoteWrite)
log.Printf("RemoteApi: prometheus remote-wirte api opened")
}
}
}
// start expose prometheus metrics
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(fmt.Sprintf(":%d", config.Port), nil)
}