-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcontroller.go
134 lines (118 loc) · 3.46 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"time"
"github.com/abowloflrf/k8s-event-collector/config"
"github.com/abowloflrf/k8s-event-collector/receiver"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type EventController struct {
clientset kubernetes.Interface
informerFactory informers.SharedInformerFactory
evInformer coreinformers.EventInformer
stopper chan struct{}
informerHasSynced bool
targets []receiver.Receiver
queue workqueue.Interface
}
func (ec *EventController) enqueue(e *corev1.Event, handleType string) {
logrus.Infof("event %s [%s][%s/%s][%s], last since %v", handleType, string(e.UID), e.InvolvedObject.Namespace, e.InvolvedObject.Name, e.Reason, time.Since(e.LastTimestamp.Time))
// prevent old events being handled when controller just start
if time.Since(e.LastTimestamp.Time) > time.Second*5 {
return
}
logrus.Infof("event to send [%s]", string(e.UID))
ec.queue.Add(e)
}
func (ec *EventController) worker() {
for ec.processNextItem() {
}
}
func (ec *EventController) processNextItem() bool {
item, quit := ec.queue.Get()
if quit {
return false
}
event := item.(*corev1.Event)
defer ec.queue.Done(item)
for _, t := range ec.targets {
if !t.Filter(event) {
continue
}
err := t.Send(event)
if err != nil {
logrus.Errorf("send event to [%s] error: %v", t.Name(), err)
}
}
return true
}
func (ec *EventController) Run(workers int, stop <-chan struct{}) {
logrus.Info("starting event controller")
defer logrus.Info("stopping event controller")
defer ec.queue.ShutDown()
ec.informerFactory.Start(stop)
if !cache.WaitForCacheSync(stop, ec.evInformer.Informer().HasSynced) {
logrus.Error("wait for cache sync error")
return
}
ec.informerHasSynced = true
logrus.Info("informer cache synced, controller started")
for i := 0; i < workers; i++ {
go wait.Until(ec.worker, time.Second, stop)
}
<-stop
}
func (ec *EventController) addHandlers() {
ec.evInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
e := obj.(*corev1.Event)
ec.enqueue(e, "ADD")
},
UpdateFunc: func(_, newObj interface{}) {
e := newObj.(*corev1.Event)
ec.enqueue(e, "UPDATE")
},
DeleteFunc: func(obj interface{}) {
// Nothing to do
},
})
}
func (ec *EventController) Stop() {
close(ec.stopper)
}
func NewEventController(cs *kubernetes.Clientset) *EventController {
factory := informers.NewSharedInformerFactory(cs, 0)
evInformer := factory.Core().V1().Events()
// elasticsearch / stdout
var targets []receiver.Receiver
if config.C.Receivers.ElasticSearch != nil {
target, err := receiver.NewElasticsearchTarget(config.C.Receivers.ElasticSearch)
if err != nil {
logrus.Errorf("create receiver error: %v", err)
} else {
targets = append(targets, target)
logrus.Infof("receiver loaded, %s", target.Name())
}
}
if config.C.Receivers.Stdout {
target, _ := receiver.NewStdoutTarget()
targets = append(targets, target)
logrus.Infof("receiver loaded, %s", target.Name())
}
ec := &EventController{
clientset: cs,
informerFactory: factory,
evInformer: evInformer,
stopper: make(chan struct{}),
targets: targets,
queue: workqueue.New(),
}
ec.addHandlers()
return ec
}