Skip to content

Commit d9dbfa4

Browse files
committed
MFW-6184 Added new publsiher
1 parent d66505a commit d9dbfa4

File tree

3 files changed

+215
-1
lines changed

3 files changed

+215
-1
lines changed

.vscode/settings.json

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
{
2+
"workbench.colorCustomizations": {
3+
"activityBar.activeBackground": "#d5e5ea",
4+
"activityBar.background": "#d5e5ea",
5+
"activityBar.foreground": "#15202b",
6+
"activityBar.inactiveForeground": "#15202b99",
7+
"activityBarBadge.background": "#bf80b0",
8+
"activityBarBadge.foreground": "#15202b",
9+
"commandCenter.border": "#15202b99",
10+
"sash.hoverBorder": "#d5e5ea",
11+
"statusBar.background": "#b3d0d9",
12+
"statusBar.foreground": "#15202b",
13+
"statusBarItem.hoverBackground": "#91bbc8",
14+
"statusBarItem.remoteBackground": "#b3d0d9",
15+
"statusBarItem.remoteForeground": "#15202b",
16+
"titleBar.activeBackground": "#b3d0d9",
17+
"titleBar.activeForeground": "#15202b",
18+
"titleBar.inactiveBackground": "#b3d0d999",
19+
"titleBar.inactiveForeground": "#15202b99"
20+
},
21+
"peacock.color": "#b3d0d9"
22+
}

services/alerts/publish.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,12 +136,13 @@ func (publisher *ZmqAlertPublisher) zmqPublisher() {
136136
for {
137137
select {
138138
case msg := <-publisher.messagePublisherChannel:
139+
publisher.logger.Info("topic: %s\nmessage: %s\n", msg.Topic, msg.Message)
139140
sentBytes, err := socket.SendMessage(msg.Topic, msg.Message)
140141
if err != nil {
141142
publisher.logger.Err("Publisher Send error: %s\n", err)
142143
continue
143144
}
144-
publisher.logger.Debug("Message sent: %v bytes\n", sentBytes)
145+
publisher.logger.Info("Message sent: %v bytes\n", sentBytes)
145146
case <-publisher.zmqPublisherShutdown:
146147
publisher.logger.Info("ZMQ Publisher shutting down\n")
147148
return

services/publisher/event_publish.go

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
package publisher
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
"time"
7+
8+
loggerModel "github.com/untangle/golang-shared/logger"
9+
10+
zmq "github.com/pebbe/zmq4"
11+
"github.com/untangle/golang-shared/structs/protocolbuffers/Alerts"
12+
"google.golang.org/protobuf/proto"
13+
)
14+
15+
///b
16+
17+
// AlertZMQTopic Topic name to be used when sending alerts.
18+
const AlertZMQTopic string = "arista:reportd:events"
19+
20+
const PublisherSocketAddress = "ipc:///var/zmq_event_publisher"
21+
const SubscriberSocketAddress = "ipc:///var/zmq_event_subscriber"
22+
23+
// ZmqMessage is a message sent over a zmq bus for us to consume.
24+
type ZmqMessage struct {
25+
Topic string
26+
Message []byte
27+
}
28+
29+
const messageBuffer = 1000
30+
31+
var alertPublisherSingleton *ZmqAlertPublisher
32+
var once sync.Once
33+
34+
type AlertPublisher interface {
35+
Send(alert *Alerts.Alert)
36+
}
37+
38+
// ZmqAlertPublisher runs a ZMQ publisher socket in the background.
39+
// When the Send method is called the alert is passed down to the
40+
// ZMQ socket using a chanel and the message is published to ZMQ
41+
// using the alert specific topic.
42+
type ZmqAlertPublisher struct {
43+
logger loggerModel.LoggerLevels
44+
messagePublisherChannel chan ZmqMessage
45+
zmqPublisherShutdown chan bool
46+
zmqPublisherStarted chan int32
47+
socketAddress string
48+
started int32
49+
}
50+
51+
func (publisher *ZmqAlertPublisher) Name() string {
52+
return "Alert publisher"
53+
}
54+
55+
// NewZmqAlertPublisher Gets the singleton instance of ZmqAlertPublisher.
56+
func NewZmqEventPublisher(logger loggerModel.LoggerLevels) *ZmqAlertPublisher {
57+
once.Do(func() {
58+
alertPublisherSingleton = &ZmqAlertPublisher{
59+
logger: logger,
60+
messagePublisherChannel: make(chan ZmqMessage, messageBuffer),
61+
zmqPublisherShutdown: make(chan bool),
62+
zmqPublisherStarted: make(chan int32, 1),
63+
socketAddress: PublisherSocketAddress,
64+
}
65+
})
66+
67+
return alertPublisherSingleton
68+
}
69+
70+
func NewDefaultEventPublisher(logger loggerModel.LoggerLevels) AlertPublisher {
71+
return NewZmqEventPublisher(logger)
72+
}
73+
74+
// Startup starts the ZMQ publisher in the background.
75+
func (publisher *ZmqAlertPublisher) Startup() error {
76+
publisher.logger.Info("Starting up the Alerts service\n")
77+
78+
// Make sure it is not started twice.
79+
if atomic.LoadInt32(&publisher.started) > 0 {
80+
publisher.logger.Debug("Alerts service is already running.\n")
81+
return nil
82+
}
83+
84+
go publisher.zmqPublisher()
85+
86+
// Blocks until the publisher starts.
87+
atomic.AddInt32(&publisher.started, <-publisher.zmqPublisherStarted)
88+
89+
return nil
90+
}
91+
92+
// Shutdown stops the goroutine running the ZMQ subscriber and closes the channels used in the service.
93+
func (publisher *ZmqAlertPublisher) Shutdown() error {
94+
publisher.logger.Info("Shutting down the Alerts service\n")
95+
96+
// Make sure it is not shutdown twice.
97+
if atomic.LoadInt32(&publisher.started) == 0 {
98+
publisher.logger.Debug("Alerts service is already shutdown.\n")
99+
return nil
100+
}
101+
102+
publisher.zmqPublisherShutdown <- true
103+
close(publisher.zmqPublisherShutdown)
104+
close(publisher.zmqPublisherStarted)
105+
close(publisher.messagePublisherChannel)
106+
atomic.StoreInt32(&publisher.started, 0)
107+
108+
return nil
109+
}
110+
111+
// Send publishes the alert to on the ZMQ publishing socket.
112+
func (publisher *ZmqAlertPublisher) Send(alert *Alerts.Alert) {
113+
// Make sure it is not shutdown.
114+
if atomic.LoadInt32(&publisher.started) == 0 {
115+
publisher.logger.Debug("Alerts service has been shutdown.\n")
116+
return
117+
}
118+
119+
// 2 reasons to set the timestamp here:
120+
// - the caller isn't responsible for setting the timestamp so we just need to set it in one place (here)
121+
// - we set it before putting it in queue, which means we have the timestamp of the alert creation, not the timestamp when it was processed
122+
alert.Datetime = time.Now().Unix()
123+
124+
publisher.logger.Debug("Publish alert %v\n", alert)
125+
alertMessage, err := proto.Marshal(alert)
126+
if err != nil {
127+
publisher.logger.Err("Unable to marshal alert entry: %s\n", err)
128+
return
129+
}
130+
131+
publisher.messagePublisherChannel <- ZmqMessage{Topic: AlertZMQTopic, Message: alertMessage}
132+
publisher.logger.Info("Alert published\n")
133+
}
134+
135+
// zmqPublisher initializes a ZMQ publishing socket and listens on the
136+
// messagePublisherChannel. The received messages are published to the
137+
// ZMQ publisher socket.
138+
//
139+
// This method should be run as a goroutine. The goroutine can be stopped
140+
// by sending a message on the zmqPublisherShutdown channel.
141+
func (publisher *ZmqAlertPublisher) zmqPublisher() {
142+
socket, err := publisher.setupZmqPubSocket()
143+
if err != nil {
144+
publisher.logger.Warn("Unable to setup ZMQ publisher socket: %s\n", err)
145+
return
146+
}
147+
defer socket.Close()
148+
149+
publisher.zmqPublisherStarted <- 1
150+
151+
for {
152+
select {
153+
case msg := <-publisher.messagePublisherChannel:
154+
publisher.logger.Info("topic: %s\nmessage: %s\n", msg.Topic, msg.Message)
155+
sentBytes, err := socket.SendMessage(msg.Topic, msg.Message)
156+
if err != nil {
157+
publisher.logger.Err("Publisher Send error: %s\n", err)
158+
continue
159+
}
160+
publisher.logger.Info("Message sent: %v bytes\n", sentBytes)
161+
case <-publisher.zmqPublisherShutdown:
162+
publisher.logger.Info("ZMQ Publisher shutting down\n")
163+
return
164+
}
165+
}
166+
}
167+
168+
// setupZmqPubSocket sets up the ZMQ socket for publishing alerts
169+
func (publisher *ZmqAlertPublisher) setupZmqPubSocket() (soc *zmq.Socket, err error) {
170+
publisher.logger.Info("Setting up Alerts ZMQ publisher socket...\n")
171+
172+
socket, err := zmq.NewSocket(zmq.PUB)
173+
if err != nil {
174+
publisher.logger.Err("Unable to open ZMQ publisher socket: %s\n", err)
175+
return nil, err
176+
}
177+
178+
if err = socket.SetLinger(0); err != nil {
179+
publisher.logger.Err("Unable to SetLinger on ZMQ publisher socket: %s\n", err)
180+
return nil, err
181+
}
182+
183+
if err = socket.Connect(publisher.socketAddress); err != nil {
184+
publisher.logger.Err("Unable to bind to ZMQ socket: %s\n", err)
185+
return nil, err
186+
}
187+
188+
publisher.logger.Info("Alerts ZMQ Publisher started!\n")
189+
190+
return socket, nil
191+
}

0 commit comments

Comments
 (0)