You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
docs(rumqttd/architecture): added ontology description and feature description (#635)
* docs(rumqttd/architecture): added ontology description and feature description
Enhanced the description of the current architecture of `rumqttd`
* docs(rumqttd/architecture): made docs more specific
* docs(rumqttd/architecture) clarified router start semantics
* docs(rumqttd/architecture): clarified link specifics and datapath
- Router: Data storage + routing logic. Connections (links) forward data to router
3
+
---
5
4
5
+
- Router: Data storage + routing logic. Connections (links) forward data to router
6
6
7
7
## Initialization
8
8
9
9
The `Broker` when created spawns a `Router` which listens on the receiver for any events. When `Broker::start()` is called, a TCP listener is spawned which listens for MQTT connections.
10
10
11
-
12
11
## `RemoteLink` and `Network`
13
12
14
13
`LinkRx` is responsible for sending `Notification` from `Router` to `RemoteLink` and `LinkTx` is responsible for sending `Event`s from `RemoteLink` to `Router`, though `RemoteLink` is not the only thing sending `Event`s to `Router`.
15
14
16
15
Whenever a new connection is received, a `Network` is spawned to handle the connection stream, and a `LinTx`-`LinkRx` pair wrapped in a `RemoteLink` is spawned which is responsible to:
16
+
17
17
1. initiate he MQTT connection using the corresponding `Network` and registering the connection with the `Router`.
18
18
2. asynchronously await on
19
-
- `Network` for reading packets
20
-
- `LinkRx` for notifications from router which need to be forwarded to `Network`.
19
+
-`Network` for reading packets
20
+
-`LinkRx` for notifications from router which need to be forwarded to `Network`.
21
21
22
22
There are separate shared buffers (`Arc<Mutex<..>>`s) for `LinkTx` and `LinkRx` which is used to actually send notifications and events over, and the mpsc channel is just used to notify that there is something present in the shared buffer.
23
23
24
-
25
24
## `Connection`
26
25
27
26
`Connection` is a struct used by `Router` to store all the metadata about a connection, as well as manage a QoS1 buffer for the same. `Connection` also holds onto the mpsc receiver which is used to notify the `RemoteLink` that are there some new notifications to send over to `Network`.
28
27
29
-
30
28
## `Router::readyqueue`
31
29
32
30
`Router::readyqueue` stores all the connections to which `Router` has some notifications to forward to, whether these are acks or publish messages.
33
31
34
-
35
32
## `DataRequest`
36
33
37
34
`DataRequest` represent a connection's request to fetch the data off from a topic it has subscribed to. The connection hasn't actually sent any request except the subscribe packet at start, but these are instead used to continuously read data off the logs to ensure that connection is up-to-date.
38
35
39
-
40
36
## `Tracker` (`Router::trackers`)
41
37
42
38
`Tracker` stores to current status of a given connection's `DataRequest`s. It has various unschedule flags, one if which is set whenever the connection is removed from the `Router::reaydqueue`, the reason for which in majority of cases is because the connection has caught up the with topic (`Tracker::caughtup_unschedule` flag is set in this case). Tracker also keeps all the `DataRequest`s for the given connection, as well as whether there are any pending acks for the connection in the `Router::ackslog`.
`Router::datalog`, for each valid topic, stores all the pending `DataRequest`s. Note that these data requests pending because the corresponding connection has caught up with the logs. These `DataRequest`s being here means that they are not stored in `Router::trackers` when `Tracker::caughtup_unschedule` is set.
48
43
49
-
50
44
## `Router`
51
45
52
46
`Router::run()` loops over `Router::run_inner()`, which:
47
+
53
48
- if `Router::reaydqueue` is empty, blockingly wait over events receiver to prevent spinning in loop. There is nothing to send over to `RemoteLink`, and thus blocking here is fine.
54
49
- if `Router::readyqueue` is not empty, parses 0-500 events from receiver. This means that if there are no events to be parsed, router moves on to sending notifications over to `RemoteLink`.
55
50
56
51
`Router::consume(id: ConnectionId)` is the function where we for the given `id`:
52
+
57
53
1. send all the pending acks
58
54
2. send all the pending data, that is, all the messages from topics the connection has subscribed to, if any.
59
55
60
-
61
56
### New Connection Event
62
57
63
58
Whenever a new connection is formed (which `Router` gets informed of via `Event::Connect`), the router:
59
+
64
60
1. checks if adding new connection exceeds the total number of allowed connections or not.
65
61
2. if clean session is not requested, retrieve the any pending data and acks that were not sent
66
-
- **NOTE:** The metrics of the previous session are still retrieved, regardless of whether a new session was requested or not.
62
+
-**NOTE:** The metrics of the previous session are still retrieved, regardless of whether a new session was requested or not.
67
63
3. necessary initializations are made
68
64
4. finally a connack is sent back
69
65
70
66
When a new connection is added, we don't add the corresponding `ConnectionId` to the `readyqueue` because we directly append the connack to the shared buffer and notify `RemoteLink` here itself instead of waiting for `Router::consume` to do so.
71
67
72
-
73
68
### Device Data Event
74
69
75
70
Whenever the `RemoteLink` reads off some bytes from `Network`, it appends it to the shared buffer and notifies the `Router` via `Event::DeviceData`. The router parses these bytes as some MQTT packets and performs the required actions.
@@ -94,7 +89,6 @@ These packets are registed with the QoS1 buffer in `Connection`, and if the ack
94
89
95
90
`PingReq` and disconnect packets are also parsed, rest are ignored.
96
91
97
-
98
92
### Disconnect Event
99
93
100
94
`Router::handle_disconnection()` is called here, as well as within data events when disconnection is required. The connection is removed, and so is corresponding acks in `Router::ackslog`. All the data requests in waiters is pushed back to tracker, and tracker itself is saved in graveyard if clean session is not asked.
@@ -103,33 +97,39 @@ These packets are registed with the QoS1 buffer in `Connection`, and if the ack
103
97
104
98
When a connection's buffer is full, the router pushes an unschedule notification at the last of the buffer. So whenever the `RemoteLink` encounters unschedule notification, it sends a ready event to router to let it know that buffer has now free space for more notifications.
105
99
106
-
107
100
## State machine transitions
108
-
----------------------------
101
+
102
+
---
109
103
110
104
NOTE: Transition to ConnectionReady will schedule the connection for consumption
111
-
- Send acks (connack, suback, puback, pubrec ...) and data forwards
112
-
- Possible output states
113
-
- ConnectionCaughtup
114
-
- ConnectionBusy
115
-
- InflightFull
105
+
106
+
- Send acks (connack, suback, puback, pubrec ...) and data forwards
107
+
- Possible output states
108
+
109
+
- ConnectionCaughtup
110
+
- ConnectionBusy
111
+
- InflightFull
116
112
117
113
- New connection (event)
114
+
118
115
- New tracker initialized with ConnectionBusy
119
116
- ConnectionBusy -> ConnectionReady
120
117
121
118
- New subscription
119
+
122
120
- Initialize tracker with requests
123
121
- if ConnectionCaughtUp
124
122
- ConnectionCaughtUp -> ConnectionReady
125
123
126
124
- New publish
125
+
127
126
- Write to a filter/s
128
127
- Reinitialize trackers with parked requests
129
128
- If ConnectionCaughtUp
130
129
- ConnectionCaughtUp -> ConnectionReady
131
130
132
131
- Acks
132
+
133
133
- Handle outgoing state
134
134
- If InflightFull
135
135
- ConnectionCaughtUp -> ConnectionReady
@@ -139,5 +139,95 @@ NOTE: Transition to ConnectionReady will schedule the connection for consumption
139
139
- If ConnectionBusy
140
140
- ConnectionBusy -> ConnectionReady
141
141
142
+
## Basic Concepts and Procedures
143
+
144
+
### Ontology
145
+
146
+
Here are a few terms to understand before we move on. These terms are used internally, within the codebase, to organize the files and modules.
147
+
148
+
#### Core Concepts and Entities
149
+
150
+
0. Input and Output Queues (`ibufs` and `obufs`)
151
+
152
+
The datapath of `rumqttd` are two instances of `VecDequeue` from `std::collections`. They are [`Slab`](https://docs.rs/slab/latest/slab/) datatypes, which pre-allocates storage. Everything else just manages these and their interfaces to various other parts of the system. The queues are as follows -
153
+
154
+
-`ibufs`: All incoming data to `rumqttd` (represented by a `Packet`) is stored immediately in `ibufs` (which holds a `VecDequeue`)
155
+
-`obufs`: All outgoing data `rumqttd` (represented by `Notification`) is stored immediately in the `obufs` (which holds a `VecDequeue`).
156
+
157
+
`obufs` also has an _in-flight queue_ to track packets that have been sent via the network but have not been acknowledged.
158
+
159
+
1. Broker
160
+
161
+
Top-level entity that handles _everything_, including, but not limited to
162
+
163
+
- Configuring and creating the Router
164
+
- Starting the Router
165
+
- Maintaining a channel that communicates with the router
166
+
167
+
When you create a new Broker, a new Router is also automatically created.
168
+
169
+
When you start the Broker, the following happens ->
170
+
171
+
1. The Router is started, and we recieve a channel (`router_tx`), which all other Links and Servers pass `Event`s into
172
+
2. (on a new thread) Creates the metrics server
173
+
3. (on a new thread) Starts all `mqttv4` servers
174
+
4. (on a new thread) Starts all `mqttv5` servers
175
+
5. (on a new thread, if enabled) Starts all `websocket` servers
176
+
6. (on a new thread) Starts the prometheus listener
177
+
7. (on the same thread) Starts the ConsoleLink to allow HTTP queries to monitor and manage the broker itself.
178
+
179
+
2. Router
180
+
181
+
Entity created by the broker that controls the flow of data in `rumqttd`. The router is responsible for managing, authorizing, and scheduling the flow of data between components within and connected to `rumqttd`.
182
+
183
+
<!-- At any given time, the router's event loop must choose between handling incoming messages or sending outgoing messages. -->
184
+
185
+
The router is built on the reactor pattern, where it works primarily by _reacting_ to events that happen. As and when an event is added to `ibufs`, the router reacts by checking its current state (for a certain connection or message) and dispatching the appropriate action. This can involve changing other in-memory data structures, or writing the message to `obufs` to make sure it's handled by the appropriate link.
186
+
187
+
An action of any sort on the router's end which involves communication usually involves the router adding a structure or packet to one of these buffers.
188
+
189
+
The router also shares Channels, which do not contain the events themselves, but rather contain notifications to events that might be added into the shared event buffers. This also has the added benefit of preventing lock contention between different futures/tasks, which would've happened if they had to keep checking the buffers
190
+
191
+
When you create the router, the following happens ->
192
+
193
+
1. Transmission Channels get created to and from the router
194
+
2. A `Graveyard` to store information to persist connections
195
+
3. A tracker for All the alerts and metrics that are necessary to be handled
196
+
4. A Map between subscriptions and devices gets created
197
+
5. The two communication buffers `ibufs` and `obufs` get initialized
198
+
6. Logs for message acknowledgements, subscriptions, and alerts get created
199
+
7. A Packet Cache is created
200
+
8. The Scheduler struct is created
201
+
202
+
Once all necessary things have been initialized, the `spawn` method intializes the router's event loop on a separate thread.
203
+
This method returns a cloneable transmission channel that the Broker passes to all links so they can communicate with the router.
204
+
205
+
3. Link
206
+
207
+
An Asynchronous and usually network-facing entity that Handles transmission of information to and from the Router. Information here can mean things including but not limited to
208
+
209
+
1. Messages incoming from devices (publish)
210
+
2. Messages outgoing to devices (subscribe)
211
+
3. Information about metrics
212
+
4. Other events such as pings
213
+
214
+
There are various types of Links. Here they are, explored in minimal detail -
215
+
216
+
1. Bridge -> Can be used to link two `rumqttd` instances for high availability.
217
+
2. Alerts -> Since `rumqttd` is embeddable, this allows developers to create custom hooks to handle alerts that `rumqttd` gives out.
218
+
3. Console -> Creates a lightweight HTTP Server to obtain information and metrics from the `rumqttd` instance.
219
+
4. Local -> Contains functions to perform Publish and Subscribe actions to the router. This is used inside RemoteLink.
220
+
5. Meters -> Creates a link specifically to consume metrics, is useful for external observability or hooks
221
+
6. Remote -> `RemoteLink` is an asynchronous entity that represents a network connection. It provides its own event loop, which does the following -
222
+
- Reads from the network socket, writes to `ibufs` and notifies the router
223
+
- Writes to the network socket when required
224
+
7. Timer -> Handles timing actions for metrics, and sends `Event`s related to alerts and metrics to their respective links.
142
225
226
+
#### Folder Structure
143
227
228
+
-`link` -> Contains all the different `link`s and their individual logic
229
+
-`protocol` -> Handles serializing/deserializing MQTT v4 and v5 Packets while providing a unified abstraction to process the same. Pinned version of [mqttbytes](https://github.com/bytebeamio/mqttbytes)
230
+
-`replicator` (not in use) -> Contains primitives for a clustered setup
231
+
-`router` -> Contains the logic for the router, central I/O Buffers, Connection and Client Management, Scheduler, Timer
232
+
-`segments` -> Contains the logic for disk persistence and Memory Management
233
+
-`server` -> Contains the Broker, TLS Connection Logic, and some Commmon Types
0 commit comments