Skip to content

Commit 6ec5f27

Browse files
authored
Simple Message Broker (#50)
1 parent 06e2feb commit 6ec5f27

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+15136
-306
lines changed

.vscode/launch.json

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,38 @@
145145
"DRAFT_CONFIG": "tests/blueprint/node_3.yaml"
146146
},
147147
},
148+
{
149+
"name": "Catalyst",
150+
"type": "go",
151+
"request": "launch",
152+
"mode": "auto",
153+
"program": "services/core/catalyst/main.go",
154+
"cwd": "${workspaceFolder}",
155+
"debugAdapter": "dlv-dap",
156+
"env": {
157+
"DRAFT_CONFIG": "tests/catalyst/config.yaml"
158+
},
159+
},
160+
{
161+
"name": "dctl - Produce",
162+
"type": "go",
163+
"request": "launch",
164+
"mode": "auto",
165+
"program": "tools/dctl/main.go",
166+
"cwd": "${workspaceFolder}",
167+
"debugAdapter": "dlv-dap",
168+
"args": ["broker", "produce"],
169+
},
170+
{
171+
"name": "dctl - Consume",
172+
"type": "go",
173+
"request": "launch",
174+
"mode": "auto",
175+
"program": "tools/dctl/main.go",
176+
"cwd": "${workspaceFolder}",
177+
"debugAdapter": "dlv-dap",
178+
"args": ["broker", "consume"],
179+
},
148180
{
149181
"name": "Fuse",
150182
"type": "go",

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ Draft is a framework for easily building reliable, efficient, and scalable distr
88

99
Blueprint is a core service that provides both a service registry and a generic key/value store for dynamic service configuration. All processes in a Draft cluster register themselves with Blueprint at startup so that Blueprint can manage all service configuration and provide a single view into the status of the cluster.
1010

11+
### Catalyst
12+
Catalyst is a core service that acts as the primary message broker, and actor system. Services, and clients can `Produce` and `Consume` [CloudEvent](https://cloudevents.io/) messages. Currently a simple `Broadcast` message delivery is implemented so each `Consumer` of the same channel will receive the message at the same time.
13+
1114
### Fuse
1215

1316
Fuse is a core service that enabled routing between Draft processes as well as ingress into the cluster. It manages an installation of [Envoy](https://www.envoyproxy.io/) to route requests from clients to services.

services/core/blueprint/web-client/package-lock.json

Lines changed: 5 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

services/core/blueprint/web-client/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
"@mui/material": "^5.14.20",
2525
"@reduxjs/toolkit": "^2.0.1",
2626
"antd": "^5.21.5",
27-
"api": "https://gitpkg.vercel.app/steady-bytes/draft/api?main",
27+
"api": "https://gitpkg.vercel.app/steady-bytes/draft/api?api/v0.6.0",
2828
"localforage": "^1.10.0",
2929
"match-sorter": "^6.3.1",
3030
"react": "^18.2.0",

services/core/catalyst/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Catalyst
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package broker
2+
3+
import (
4+
"encoding/base64"
5+
"sync"
6+
7+
"connectrpc.com/connect"
8+
acv1 "github.com/steady-bytes/draft/api/core/message_broker/actors/v1"
9+
)
10+
11+
type (
12+
atomicMap struct {
13+
mu sync.RWMutex
14+
// Store the routine client connection
15+
m map[string][]*connect.ServerStream[acv1.ConsumeResponse]
16+
// yield the client connection to a thread, and then send events to it
17+
n map[string]chan *acv1.CloudEvent
18+
}
19+
)
20+
21+
func newAtomicMap() *atomicMap {
22+
return &atomicMap{
23+
mu: sync.RWMutex{},
24+
m: make(map[string][]*connect.ServerStream[acv1.ConsumeResponse]),
25+
n: make(map[string]chan *acv1.CloudEvent),
26+
}
27+
}
28+
29+
// hash to calculate the same key for two strings
30+
func (am *atomicMap) hash(msgKindName string) string {
31+
bs := []byte(msgKindName)
32+
return base64.StdEncoding.EncodeToString(bs)
33+
}
34+
35+
func (am *atomicMap) Insert(key string, resStream *connect.ServerStream[acv1.ConsumeResponse]) {
36+
// TODO: Figure out how to start with a read lock?
37+
am.mu.RLock()
38+
defer am.mu.RUnlock()
39+
list, ok := am.m[key]
40+
if !ok {
41+
am.mu.RUnlock()
42+
am.mu.Lock()
43+
defer am.mu.Unlock()
44+
var list []*connect.ServerStream[acv1.ConsumeResponse]
45+
list = append(list, resStream)
46+
am.m[key] = list
47+
} else {
48+
list = append(list, resStream)
49+
am.m[key] = list
50+
}
51+
}
52+
53+
func (am *atomicMap) Broker(key string, resStream *connect.ServerStream[acv1.ConsumeResponse]) {
54+
am.mu.RLock()
55+
ch, found := am.n[key]
56+
if !found {
57+
// create the channel to add to map
58+
ch := make(chan *acv1.CloudEvent)
59+
// store channel in map for future connections
60+
am.mu.RUnlock()
61+
am.mu.Lock()
62+
am.n[key] = ch
63+
am.mu.Unlock()
64+
// now start a new routine and keep it open as long as the `ch` channel has connected clients
65+
go am.send(ch, resStream)
66+
67+
return
68+
} else {
69+
// the channel is already made and shared with other consumers, and producers so we can just use `ch`
70+
go am.send(ch, resStream)
71+
am.mu.RUnlock()
72+
}
73+
}
74+
75+
func (am *atomicMap) send(ch chan *acv1.CloudEvent, stream *connect.ServerStream[acv1.ConsumeResponse]) {
76+
// when the channel receives a message send to the stream the client is holding onto
77+
for {
78+
m := <-ch
79+
msg := &acv1.ConsumeResponse{
80+
Message: m,
81+
}
82+
stream.Send(msg)
83+
}
84+
}
85+
86+
func (am *atomicMap) Broadcast(key string, msg *acv1.CloudEvent) {
87+
ch, ok := am.n[key]
88+
if ok {
89+
ch <- msg
90+
} else {
91+
// we don't have any consumers that will listen to the message so as of right now
92+
// the message is not worth sending
93+
94+
// TODO: We might consider a dead letter queue
95+
return
96+
}
97+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
package broker
2+
3+
import (
4+
"context"
5+
6+
"connectrpc.com/connect"
7+
acv1 "github.com/steady-bytes/draft/api/core/message_broker/actors/v1"
8+
)
9+
10+
type (
11+
Consumer interface {
12+
Consume(ctx context.Context, msg *acv1.CloudEvent, stream *connect.ServerStream[acv1.ConsumeResponse]) error
13+
}
14+
15+
consumer struct {
16+
consumerRegistrationChan chan register
17+
}
18+
)
19+
20+
func NewConsumer(consumerRegistrationChan chan register) Consumer {
21+
return &consumer{
22+
consumerRegistrationChan: consumerRegistrationChan,
23+
}
24+
25+
}
26+
27+
func (c *consumer) Consume(ctx context.Context, msg *acv1.CloudEvent, stream *connect.ServerStream[acv1.ConsumeResponse]) error {
28+
// fling the consumer stream into the controller
29+
c.consumerRegistrationChan <- register{
30+
CloudEvent: msg,
31+
ServerStream: stream,
32+
}
33+
34+
return nil
35+
}
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package broker
2+
3+
import (
4+
"connectrpc.com/connect"
5+
acv1 "github.com/steady-bytes/draft/api/core/message_broker/actors/v1"
6+
"github.com/steady-bytes/draft/pkg/chassis"
7+
)
8+
9+
type (
10+
Controller interface {
11+
Consumer
12+
Producer
13+
}
14+
15+
controller struct {
16+
Producer
17+
Consumer
18+
19+
logger chassis.Logger
20+
21+
state *atomicMap
22+
}
23+
24+
register struct {
25+
*acv1.CloudEvent
26+
*connect.ServerStream[acv1.ConsumeResponse]
27+
}
28+
)
29+
30+
func NewController(logger chassis.Logger) Controller {
31+
var (
32+
producerMsgChan = make(chan *acv1.CloudEvent)
33+
consumerRegistrationChan = make(chan register)
34+
)
35+
36+
ctr := &controller{
37+
NewProducer(producerMsgChan),
38+
NewConsumer(consumerRegistrationChan),
39+
logger,
40+
newAtomicMap(),
41+
}
42+
43+
// TODO: This could contain more configuration. Like maybe reading the number
44+
// of cpu cores to spread the works over?
45+
46+
go ctr.produce(producerMsgChan)
47+
go ctr.consume(consumerRegistrationChan)
48+
49+
return ctr
50+
}
51+
52+
const (
53+
LOG_KEY_TO_CH = "key to channel"
54+
)
55+
56+
func (c *controller) produce(producerMsgChan chan *acv1.CloudEvent) {
57+
for {
58+
msg := <-producerMsgChan
59+
c.logger.WithField("msg: ", msg).Info("produce massage received")
60+
61+
// make hash of <domain><msg.Type.String>
62+
key := c.state.hash(string(msg.ProtoReflect().Descriptor().FullName()))
63+
64+
// do I save to blueprint?
65+
// - default config is to be durable
66+
// - the producer can also add configuration to say not to store
67+
68+
// send the received `Message` to all `Consumers` for the same key
69+
c.logger.WithField("key", key).Info(LOG_KEY_TO_CH)
70+
c.state.Broadcast(key, msg)
71+
}
72+
}
73+
74+
// consume - Will create a hash of the message domain, and typeUrl then save the msg.ServerStream to `atomicMap.m`
75+
// that can be used to `Broadcast` messages to when a message is produced. Con's to this approach are a `RWMutex`
76+
// has to be used to `Broadcast` the message so the connected stream.
77+
// func (c *controller) consume(reg chan register) {
78+
// for {
79+
// msg := <-reg
80+
// fmt.Print("Receive a request to setup a consumer", msg)
81+
82+
// // make hash of <domain><msg.Type.String>
83+
// key := c.hash(msg.GetDomain(), msg.GetKind().GetTypeUrl())
84+
85+
// // use hash as key if the hash does not exist, then create a slice of connections
86+
// // and append the connection to the slice
87+
// c.state.Insert(key, msg.ServerStream)
88+
// }
89+
// }
90+
91+
// consume - Will create a hash of the message domain, and typeUrl to use as a key to a tx, and rx sides of a channel
92+
// the `tx` or transmitter will be used when a producer produces an event to send the event to each client that is consuming
93+
// events of the domain, and typeUrl.
94+
func (c *controller) consume(registerChan chan register) {
95+
for {
96+
// create a shared channel that will receive any kind of message of that domain, and typeUrl
97+
// add the receiver to a go routine that will keep the `ServerStream` open and send any messages
98+
// received up to the client connect.
99+
msg := <-registerChan
100+
c.logger.WithField("msg", msg).Info("consume channel registration")
101+
102+
key := c.state.hash(string(msg.ProtoReflect().Descriptor().FullName()))
103+
104+
// key := c.state.hash(msg.GetDomain(), msg.GetKind().GetTypeUrl())
105+
c.logger.WithField("key", key).Info(LOG_KEY_TO_CH)
106+
c.state.Broker(key, msg.ServerStream)
107+
}
108+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package broker
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"io"
8+
9+
acv1 "github.com/steady-bytes/draft/api/core/message_broker/actors/v1"
10+
11+
"connectrpc.com/connect"
12+
)
13+
14+
type (
15+
Producer interface {
16+
Produce(ctx context.Context, inputStream *connect.BidiStream[acv1.ProduceRequest, acv1.ProduceResponse]) error
17+
}
18+
19+
producer struct {
20+
producerChan chan *acv1.CloudEvent
21+
}
22+
)
23+
24+
func NewProducer(produceChan chan *acv1.CloudEvent) Producer {
25+
return &producer{
26+
producerChan: produceChan,
27+
}
28+
}
29+
30+
// Accepts an incomming bidirectional stream to keep open and push incomming
31+
// messages into the broker when a message is `produce`'ed
32+
func (p *producer) Produce(ctx context.Context, inputStream *connect.BidiStream[acv1.ProduceRequest, acv1.ProduceResponse]) error {
33+
for {
34+
if err := ctx.Err(); err != nil {
35+
return err
36+
}
37+
38+
request, err := inputStream.Receive()
39+
if err != nil && errors.Is(err, io.EOF) {
40+
return nil
41+
} else if err != nil {
42+
return fmt.Errorf("receive request: %w", err)
43+
}
44+
45+
p.producerChan <- request.GetMessage()
46+
}
47+
}

0 commit comments

Comments
 (0)