-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add NATS publisher support to reminder #4829
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,7 @@ | ||
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors | ||
// SPDX-License-Identifier: Apache-2.0 | ||
|
||
// Package nats provides a nants+cloudevents implementation of the eventer interface | ||
// Package nats provides a nats+cloudevents implementation of the eventer interface | ||
package nats | ||
|
||
import ( | ||
|
@@ -15,23 +15,28 @@ import ( | |
cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2" | ||
cloudevents "github.com/cloudevents/sdk-go/v2" | ||
"github.com/nats-io/nats.go" | ||
"github.com/nats-io/nats.go/jetstream" | ||
"github.com/rs/zerolog" | ||
|
||
"github.com/mindersec/minder/internal/events/common" | ||
serverconfig "github.com/mindersec/minder/pkg/config/server" | ||
"github.com/mindersec/minder/pkg/config" | ||
) | ||
|
||
// BuildNatsChannelDriver creates a new event driver using | ||
// CloudEvents with the NATS-JetStream transport | ||
func BuildNatsChannelDriver(cfg *serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) { | ||
adapter := &cloudEventsNatsAdapter{cfg: &cfg.Nats} | ||
func BuildNatsChannelDriver(cfg *config.NatsConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) { | ||
if cfg == nil { | ||
return nil, nil, nil, fmt.Errorf("NATS config is nil") | ||
} | ||
|
||
adapter := &cloudEventsNatsAdapter{cfg: cfg} | ||
return adapter, adapter, func() {}, nil | ||
} | ||
|
||
// CloudEventsNatsPublisher actually consumes a _set_ of NATS topics, | ||
// because CloudEvents-Jetstream has a separate Consumer for each topic | ||
type cloudEventsNatsAdapter struct { | ||
cfg *serverconfig.NatsConfig | ||
cfg *config.NatsConfig | ||
lock sync.Mutex | ||
// Keep a cache of the topics we subscribe/publish to | ||
topics map[string]topicState | ||
|
@@ -111,25 +116,21 @@ func (c *cloudEventsNatsAdapter) ensureTopic(ctx context.Context, topic string, | |
return &state, nil | ||
} | ||
|
||
func (c *cloudEventsNatsAdapter) ensureStream(_ context.Context) error { | ||
func (c *cloudEventsNatsAdapter) ensureStream(ctx context.Context) error { | ||
conn, err := nats.Connect(c.cfg.URL) | ||
if err != nil { | ||
return err | ||
} | ||
defer conn.Close() | ||
js, err := conn.JetStream() | ||
js, err := jetstream.New(conn) | ||
if err != nil { | ||
return err | ||
} | ||
si, err := js.StreamInfo(c.cfg.Prefix) | ||
if si == nil || err != nil && err.Error() == "stream not found" { | ||
_, err = js.AddStream(&nats.StreamConfig{ | ||
Name: c.cfg.Prefix, | ||
Subjects: []string{c.cfg.Prefix + ".>"}, | ||
}) | ||
return err | ||
} | ||
return nil | ||
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ | ||
Name: c.cfg.Prefix, | ||
Subjects: []string{c.cfg.Prefix + ".>"}, | ||
}) | ||
return err | ||
Comment on lines
+129
to
+133
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Newer jetstream API, cleaner code, context support. https://github.com/nats-io/nats.go/blob/main/jetstream/README.md There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 if this works; I think when I started looking, this wasn't cleaned up / operable with CloudEvents yet. |
||
} | ||
|
||
// Subscribe implements message.Subscriber. | ||
|
@@ -188,13 +189,13 @@ func (c *cloudEventsNatsAdapter) Publish(topic string, messages ...*message.Mess | |
|
||
state, err := c.ensureTopic(ctx, subject, "sender") // subject) | ||
if err != nil { | ||
return fmt.Errorf("Error creating topic %q: %w", subject, err) | ||
return fmt.Errorf("error creating topic %q: %w", subject, err) | ||
} | ||
|
||
for _, msg := range messages { | ||
err := sendEvent(ctx, subject, state.ceClient, msg) | ||
if err != nil { | ||
return fmt.Errorf("Error sending event to %q: %w", subject, err) | ||
return fmt.Errorf("error sending event to %q: %w", subject, err) | ||
} | ||
} | ||
return nil | ||
|
@@ -211,7 +212,7 @@ func sendEvent( | |
// All our current payloads are encoded JSON; we need to unmarshal | ||
payload := map[string]any{} | ||
if err := json.Unmarshal(msg.Payload, &payload); err != nil { | ||
return fmt.Errorf("Error unmarshalling payload: %w", err) | ||
return fmt.Errorf("error unmarshalling payload: %w", err) | ||
} | ||
|
||
err := event.SetData("application/json", payload) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,12 +13,33 @@ import ( | |
"github.com/rs/zerolog" | ||
|
||
"github.com/mindersec/minder/internal/events/common" | ||
natsinternal "github.com/mindersec/minder/internal/events/nats" | ||
"github.com/mindersec/minder/pkg/eventer/constants" | ||
) | ||
|
||
func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) { | ||
switch r.cfg.EventConfig.Driver { | ||
case constants.NATSDriver: | ||
return r.setupNATSPublisher(ctx) | ||
case constants.SQLDriver: | ||
return r.setupSQLPublisher(ctx) | ||
default: | ||
return nil, nil, fmt.Errorf("unknown publisher type: %s", r.cfg.EventConfig.Driver) | ||
} | ||
} | ||
Comment on lines
+20
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm nervous about having this diverge from (I need to do some refactoring in that package to move |
||
|
||
func (r *reminder) setupNATSPublisher(_ context.Context) (message.Publisher, common.DriverCloser, error) { | ||
pub, _, cl, err := natsinternal.BuildNatsChannelDriver(&r.cfg.EventConfig.NatsConfig) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("failed to create NATS publisher: %w", err) | ||
} | ||
return pub, cl, nil | ||
} | ||
|
||
func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) { | ||
logger := zerolog.Ctx(ctx) | ||
|
||
db, _, err := r.cfg.EventConfig.Connection.GetDBConnection(ctx) | ||
db, _, err := r.cfg.EventConfig.SQLPubConfig.Connection.GetDBConnection(ctx) | ||
if err != nil { | ||
return nil, nil, fmt.Errorf("unable to connect to events database: %w", err) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -149,3 +149,14 @@ func ReadKey(keypath string) ([]byte, error) { | |
|
||
return data, nil | ||
} | ||
|
||
// NatsConfig is the configuration when using NATS as the event driver | ||
type NatsConfig struct { | ||
// URL is the URL for the NATS server | ||
URL string `mapstructure:"url" default:"nats://localhost:4222"` | ||
// Prefix is the prefix for the NATS subjects to subscribe to | ||
Prefix string `mapstructure:"prefix" default:"minder"` | ||
// Queue is the name of the queue group to join when consuming messages | ||
// queue groups allow multiple process to round-robin process messages. | ||
Queue string `mapstructure:"queue" default:"minder"` | ||
} | ||
Comment on lines
+152
to
+162
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we want to move this up to |
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think of moving the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the other constructors take the entire
cfg
, so that's the model I followed here.