Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NATS publisher support to reminder #4829

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions config/reminder-config.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ logging:
level: "debug"

events:
sql_connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
driver: "sql"
# driver: "cloudevents-nats"
sql:
connection:
dbhost: "watermill-postgres"
dbport: 5432
dbuser: postgres
dbpass: postgres
dbname: watermill
sslmode: disable
# nats:
# url: "nats://localhost:4222"
# prefix: "minder"
# queue: "minder"
5 changes: 1 addition & 4 deletions internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,6 @@ func NewEventer(ctx context.Context, _ openfeature.IClient, cfg *serverconfig.Ev
if cfg == nil {
return nil, errors.New("event config is nil")
}
if cfg == nil {
return nil, errors.New("event config is nil")
}

l := zerowater.NewZerologLoggerAdapter(
zerolog.Ctx(ctx).With().Str("component", "watermill").Logger())
Expand Down Expand Up @@ -153,7 +150,7 @@ func instantiateDriver(
return eventersql.BuildPostgreSQLDriver(ctx, cfg)
case constants.NATSDriver:
zerolog.Ctx(ctx).Info().Msg("Using NATS driver")
return nats.BuildNatsChannelDriver(cfg)
return nats.BuildNatsChannelDriver(&cfg.Nats)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other constructors take the entire cfg, so that's the model I followed here.

default:
zerolog.Ctx(ctx).Info().Msg("Driver unknown")
return nil, nil, nil, fmt.Errorf("unknown driver %s", driver)
Expand Down
39 changes: 20 additions & 19 deletions internal/events/nats/natschannel.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// SPDX-FileCopyrightText: Copyright 2024 The Minder Authors
// SPDX-License-Identifier: Apache-2.0

// Package nats provides a nants+cloudevents implementation of the eventer interface
// Package nats provides a nats+cloudevents implementation of the eventer interface
package nats

import (
Expand All @@ -15,23 +15,28 @@ import (
cejsm "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/events/common"
serverconfig "github.com/mindersec/minder/pkg/config/server"
"github.com/mindersec/minder/pkg/config"
)

// BuildNatsChannelDriver creates a new event driver using
// CloudEvents with the NATS-JetStream transport
func BuildNatsChannelDriver(cfg *serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) {
adapter := &cloudEventsNatsAdapter{cfg: &cfg.Nats}
func BuildNatsChannelDriver(cfg *config.NatsConfig) (message.Publisher, message.Subscriber, common.DriverCloser, error) {
if cfg == nil {
return nil, nil, nil, fmt.Errorf("NATS config is nil")
}

adapter := &cloudEventsNatsAdapter{cfg: cfg}
return adapter, adapter, func() {}, nil
}

// CloudEventsNatsPublisher actually consumes a _set_ of NATS topics,
// because CloudEvents-Jetstream has a separate Consumer for each topic
type cloudEventsNatsAdapter struct {
cfg *serverconfig.NatsConfig
cfg *config.NatsConfig
lock sync.Mutex
// Keep a cache of the topics we subscribe/publish to
topics map[string]topicState
Expand Down Expand Up @@ -111,25 +116,21 @@ func (c *cloudEventsNatsAdapter) ensureTopic(ctx context.Context, topic string,
return &state, nil
}

func (c *cloudEventsNatsAdapter) ensureStream(_ context.Context) error {
func (c *cloudEventsNatsAdapter) ensureStream(ctx context.Context) error {
conn, err := nats.Connect(c.cfg.URL)
if err != nil {
return err
}
defer conn.Close()
js, err := conn.JetStream()
js, err := jetstream.New(conn)
if err != nil {
return err
}
si, err := js.StreamInfo(c.cfg.Prefix)
if si == nil || err != nil && err.Error() == "stream not found" {
_, err = js.AddStream(&nats.StreamConfig{
Name: c.cfg.Prefix,
Subjects: []string{c.cfg.Prefix + ".>"},
})
return err
}
return nil
_, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: c.cfg.Prefix,
Subjects: []string{c.cfg.Prefix + ".>"},
})
return err
Comment on lines +129 to +133
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Newer jetstream API, cleaner code, context support. https://github.com/nats-io/nats.go/blob/main/jetstream/README.md

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 if this works; I think when I started looking, this wasn't cleaned up / operable with CloudEvents yet.

}

// Subscribe implements message.Subscriber.
Expand Down Expand Up @@ -188,13 +189,13 @@ func (c *cloudEventsNatsAdapter) Publish(topic string, messages ...*message.Mess

state, err := c.ensureTopic(ctx, subject, "sender") // subject)
if err != nil {
return fmt.Errorf("Error creating topic %q: %w", subject, err)
return fmt.Errorf("error creating topic %q: %w", subject, err)
}

for _, msg := range messages {
err := sendEvent(ctx, subject, state.ceClient, msg)
if err != nil {
return fmt.Errorf("Error sending event to %q: %w", subject, err)
return fmt.Errorf("error sending event to %q: %w", subject, err)
}
}
return nil
Expand All @@ -211,7 +212,7 @@ func sendEvent(
// All our current payloads are encoded JSON; we need to unmarshal
payload := map[string]any{}
if err := json.Unmarshal(msg.Payload, &payload); err != nil {
return fmt.Errorf("Error unmarshalling payload: %w", err)
return fmt.Errorf("error unmarshalling payload: %w", err)
}

err := event.SetData("application/json", payload)
Expand Down
5 changes: 3 additions & 2 deletions internal/events/nats/natschannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"

"github.com/mindersec/minder/internal/events/common"
"github.com/mindersec/minder/pkg/config"
serverconfig "github.com/mindersec/minder/pkg/config/server"
)

Expand All @@ -27,7 +28,7 @@ func TestNatsChannel(t *testing.T) {
}
defer server.Shutdown()
cfg := serverconfig.EventConfig{
Nats: serverconfig.NatsConfig{
Nats: config.NatsConfig{
URL: server.ClientURL(),
Prefix: "test",
Queue: "minder",
Expand Down Expand Up @@ -120,7 +121,7 @@ loop:
}

func buildDriverPair(ctx context.Context, cfg serverconfig.EventConfig) (message.Publisher, message.Subscriber, common.DriverCloser, <-chan *message.Message, error) {
pub, sub, closer, err := BuildNatsChannelDriver(&cfg)
pub, sub, closer, err := BuildNatsChannelDriver(&cfg.Nats)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to build nats channel driver: %v", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,33 @@ import (
"github.com/rs/zerolog"

"github.com/mindersec/minder/internal/events/common"
natsinternal "github.com/mindersec/minder/internal/events/nats"
"github.com/mindersec/minder/pkg/eventer/constants"
)

func (r *reminder) getMessagePublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) {
switch r.cfg.EventConfig.Driver {
case constants.NATSDriver:
return r.setupNATSPublisher(ctx)
case constants.SQLDriver:
return r.setupSQLPublisher(ctx)
default:
return nil, nil, fmt.Errorf("unknown publisher type: %s", r.cfg.EventConfig.Driver)
}
}
Comment on lines +20 to +29
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm nervous about having this diverge from internal/events/events.go. Can we use an eventer.Interface from eventer.New() here?

(I need to do some refactoring in that package to move eventer/interface into eventer, but I'll wait until this code is landed to reduce the amount of conflicts.)


func (r *reminder) setupNATSPublisher(_ context.Context) (message.Publisher, common.DriverCloser, error) {
pub, _, cl, err := natsinternal.BuildNatsChannelDriver(&r.cfg.EventConfig.NatsConfig)
if err != nil {
return nil, nil, fmt.Errorf("failed to create NATS publisher: %w", err)
}
return pub, cl, nil
}

func (r *reminder) setupSQLPublisher(ctx context.Context) (message.Publisher, common.DriverCloser, error) {
logger := zerolog.Ctx(ctx)

db, _, err := r.cfg.EventConfig.Connection.GetDBConnection(ctx)
db, _, err := r.cfg.EventConfig.SQLPubConfig.Connection.GetDBConnection(ctx)
if err != nil {
return nil, nil, fmt.Errorf("unable to connect to events database: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/reminder/reminder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewReminder(ctx context.Context, store db.Store, config *reminderconfig.Con
logger := zerolog.Ctx(ctx)
logger.Info().Msgf("initial repository cursor: %s", r.repositoryCursor)

pub, cl, err := r.setupSQLPublisher(ctx)
pub, cl, err := r.getMessagePublisher(ctx)
if err != nil {
return nil, err
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,14 @@ func ReadKey(keypath string) ([]byte, error) {

return data, nil
}

// NatsConfig is the configuration when using NATS as the event driver
type NatsConfig struct {
// URL is the URL for the NATS server
URL string `mapstructure:"url" default:"nats://localhost:4222"`
// Prefix is the prefix for the NATS subjects to subscribe to
Prefix string `mapstructure:"prefix" default:"minder"`
// Queue is the name of the queue group to join when consuming messages
// queue groups allow multiple process to round-robin process messages.
Queue string `mapstructure:"queue" default:"minder"`
}
Comment on lines +152 to +162
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to move this up to common (rather than just having a split between client and server configuration), let's put it in its own file. It feels kind of peculiar to have this in common while having the rest of the event configuration still in server, so I'd argue for moving the code back, particularly in the context of having smaller, more focused PRs.

14 changes: 8 additions & 6 deletions pkg/config/reminder/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ func TestValidateConfig(t *testing.T) {
MinElapsed: parseTimeDuration(t, "1h"),
},
EventConfig: reminder.EventConfig{
Connection: config.DatabaseConfig{
Port: 8080,
SQLPubConfig: reminder.SQLPubConfig{
Connection: config.DatabaseConfig{
Port: 8080,
},
},
},
},
Expand Down Expand Up @@ -153,10 +155,10 @@ func TestSetViperDefaults(t *testing.T) {
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.interval")))
require.Equal(t, 100, v.GetInt("recurrence.batch_size"))
require.Equal(t, parseTimeDuration(t, "1h"), parseTimeDuration(t, v.GetString("recurrence.min_elapsed")))
require.Equal(t, "reminder", v.GetString("events.sql_connection.dbname"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql_connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql_connection.dbuser"))
require.Equal(t, "reminder", v.GetString("events.sql.connection.dbname"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql.connection.dbhost"))
require.Equal(t, "reminder-event-postgres", v.GetString("events.sql.connection.dbhost"))
require.Equal(t, "postgres", v.GetString("events.sql.connection.dbuser"))
}

// TestOverrideConfigByEnvVar tests that the configuration can be overridden by environment variables
Expand Down
9 changes: 8 additions & 1 deletion pkg/config/reminder/events.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of moving the reminder configuration into the same package as server, and calling them all "server-side components`? It feels like it would allow for more sharing between the two implementations.

Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,15 @@ import (

// EventConfig is the configuration for reminder's eventing system.
type EventConfig struct {
Driver string `mapstructure:"driver" default:"sql"`
SQLPubConfig SQLPubConfig `mapstructure:"sql"`
NatsConfig config.NatsConfig `mapstructure:"nats"`
}

// SQLPubConfig is the configuration for the SQL publisher
type SQLPubConfig struct {
// Connection is the configuration for the SQL event driver
//
// nolint: lll
Connection config.DatabaseConfig `mapstructure:"sql_connection" default:"{\"dbname\":\"reminder\",\"dbhost\":\"reminder-event-postgres\"}"`
Connection config.DatabaseConfig `mapstructure:"connection" default:"{\"dbname\":\"reminder\",\"dbhost\":\"reminder-event-postgres\"}"`
}
13 changes: 1 addition & 12 deletions pkg/config/server/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type EventConfig struct {
// Aggregator is the configuration for the event aggregator middleware
Aggregator AggregatorConfig `mapstructure:"aggregator"`
// Nats is the configuration when using NATS as the event driver
Nats NatsConfig `mapstructure:"nats"`
Nats config.NatsConfig `mapstructure:"nats"`
}

// GoChannelEventConfig is the configuration for the go channel event driver
Expand Down Expand Up @@ -54,14 +54,3 @@ type AggregatorConfig struct {
// This is the threshold between rule evaluations + actions.
LockInterval int64 `mapstructure:"lock_interval" default:"30"`
}

// NatsConfig is the configuration when using NATS as the event driver
type NatsConfig struct {
// URL is the URL for the NATS server
URL string `mapstructure:"url" default:"nats://localhost:4222"`
// Prefix is the prefix for the NATS subjects to subscribe to
Prefix string `mapstructure:"prefix" default:"minder"`
// Queue is the name of the queue group to join when consuming messages
// queue groups allow multiple process to round-robin process messages.
Queue string `mapstructure:"queue" default:"minder"`
}
Loading