A scalable message broker to run applications like messenger, pubsub etc. in internet scale. It incorporates MQTT protocol but not too strictly in semantics. There are also a couple of syntactic extensions on top of the protocol and some fancy features as a server. But basically, any MQTT client should be able to communicate with fubar.
- Wildcard(*) subscription is not supported.
- Keep-alive negotiation extension: Fubar may respond CONNACK against CONNECT with 2-byte keep-alive payload. The client is recommended to use this value as the new ping interval.
- Alternate server extension: Fubar may respond CONNACK with a new alt-server code (6) and another server's address in the payload. The client is supposed to connect to the alt-server instead.
- Direct messaging: A client ID can be used in place of a topic to send messages directly to the client rather than to a topic.
- Tracing: An administrator may monitor all the messages (with timings) to/from specific client IDs or topics.
- [Experimental] Websocket support
$ make deps
$ cd deps/lager
$ make
$ cd ../..
$ make
Note that lager may require independent build as it uses parse_transform.
$ make run
There are also some command line parameters.
node=fubar
default, node namemqtt_port=1883
default, mqtt service portmqtts_port=8883
mqtt over ssl service porthttp_port=80
mqtt over websocket service portmaster=fubar@remote
remote node to cluster withcookie=xxxxxx
security cookie string
You can check the broker state.
$ make state
== fubar@host running ==
name : fubar
description : "Scalable MQTT message broker"
version : "3.0.0"
listeners : [{mqtt,[{connections,0},{acceptors,1024}]}]
nodes : [fubar@host]
routes : 0
memory : [{'MB',213},
{load,0.03377922962411755},
{min_load,{fubar@host,0.03377922962411755}}]
$ make account-set username=test password=test
$ make debug
1> mqtt_client_simple:connect([{client_id,<<"c1">>}, {username,<<"test">>}, {password,<<"1234">>}]).
2> mqtt_client_simple:connect([{client_id,<<"c2">>}, {username,<<"test">>}, {password,<<"1234">>}]).
3> mqtt_client:send(<<"c1">>, mqtt:subscribe([{topics, [<<"t1">>]}])).
4> mqtt_client:send(<<"c2">>, mqtt:publish([{topic, <<"t1">>}, {payload, <<"hello!">>}]).
This is an extra feature not originally stated in MQTT specification. One client may use the other client's client id as a topic name to send a message directly to the client.
5> mqtt_client:send(<<"c2">>, mqtt:publish([{topic, <<"c1">>}, {payload, <<"world!">>}]).
Press CTRL+C
twice.
make stop
$ make run master=name@host
Note) If you want to start more than one broker node in a computer, You have to use different node name and listen port as:
$ make run master=name@host node=other mqtt_port=1884
Manage account using make commands like,
$ make account-set username=romeo password=1234
$ make account-del username=romeo
$ make account-all
== fubar@host account all ==
[<<"romeo">>,<<"juliet">>]
$
{auth, mqtt_account}
line can be commented out from fubar.config to
disable account authentication. In this case, anyone may connect
the broker without username/password.
Manage access control list using make commands like,
$ make acl-set ip=127.0.0.1 allow=true
$ make acl-set ip=192.168.0.100 allow=false
$ make acl-all
== fubar@host account all ==
[{127,0,0,1},{192,168,0,100}]
$
If an ACL is set to allow=true
, any client from the address may
connect the broker without username/password. If an ACL is set to
allow=false
, no client from the address may connect the broker even
with correct username/password.
Full list of client parameters are:
1> C3 = mqtt_client_simple:connect([
{hostname, "localhost"},
{port, 1884},
{username, <<"romeo">>},
{password, <<"1234">>},
{client_id, <<"c1">>},
{keep_alive, 60},
clean_session,
{will_topic, <<"will">>},
{will_message, <<"bye-bye">>},
{will_qos, at_least_once},
will_retain,
socket_options, [...]]).
Refer inet:setopts/2
for socket_options
above.
Direct messages are traceable.
$ make trace client_id=c1 on=true
$ make trace client_id=c1 on=false
Once set, direct messages to/from the client are printed to log/console.log with TRACE tag in notice level.
The broker restores all the state -- accounts, topics and subscriptions - on restart. To clear this state, do:
$ make reset
$ cd priv/ssl/ca
$ mkdir certs private
$ chmod 700 private
$ echo 01 > serial
$ touch index.txt
$ openssl req -x509 -config openssl.cnf -newkey rsa:2048 -days 365 -out cacert.pem -outform PEM -subj /CN=fubar_ca/ -nodes
$ openssl x509 -in cacert.pem -out cacert.cer -outform DER
$ openssl genrsa -out ../key.pem 2048
$ openssl req -new -key ../key.pem -out ../req.pem -outform PEM -subj /CN=SomeHostName/O=YourOrganizationName/ -nodes
$ openssl ca -config openssl.cnf -in ../req.pem -out ../cert.pem -notext -batch -extensions server_ca_extensions
$ openssl pkcs12 -export -in ../cert.pem -out ../keycert.p12 -inkey ../key.pem -passout pass:YourPassword
Entire process is stored in batch script priv/ssl/ca/new-certs.sh. So you can just,
$ cd priv/ssl/ca
$ ./new-certs.sh
Use mqtts_port
command line parameter to start an SSL listener when
starting the broker.
$ cd ../../..
$ make test mqtts_port=8883
(fubar@host)1>
Test basic connection with openssl s_client.
$ openssl s_client -connect localhost:8883
If it succeeds, use {transport, ranch_ssl}
option on the client side.
$ make client
1> ssl:start().
2> mqtt_client_simple:connect([{port, 8883}, {transport, ranch_ssl}, {client_id, <<"ssltest">>}]).
Fubar nodes consisting of a cluster automatically offload one another to make loads as even as possible. This feature incorporates an MQTT protocol extension alternative server mechanism which is explained in the last chapter.
The load balancing behavior is controlled with several parameters in fubar_sysmon
module.
The node may use as much as memory_limit (high_watermark * physical system memory) at most. Once the memory consumption exceeds this boundary, the node offloads to another node in the cluster or just drops all subsequent connections.
The node compares it's load (memory usage / memory_limit) with other nodes' load to activate offloading. If node A's load is higher than node B's load times offloading_threshold T (La > Lb * T), the node A offloads to the node B.
While offloading, the target can be changed when another node's load appears with lower load. And the node A stops offloading when La becomes greater than or equal to the load of current offloading target Lb.
fubar_sysmon
periodically checks system memory profile. This value controls
the check interval in milliseconds.
The broker configuration is stored in fubar.config. You can edit the file
and call fuabr:load_config/0
from the shell or call fubar:settings/2
to
apply changes while the broker is running.
(fubar@host)1> fubar:settings(mqtt_protocol, {max_packet_size, 8192}).
The above sample shows changing maximum MQTT packet size to 8kB. This change affects all the clients connection from this point on. Note that some change applies immediately but others require further intervention.
Refer fubar.config for more information.
There are two tests ready. One is unit test and the other is common test.
$ make check
The above command executes unit tests included in each source module and prints verbose results on the standard output as,
======================== EUnit ========================
mqtt_protocol:928: parse_success_test_ (connect)...ok
mqtt_protocol:941: parse_success_test_ (connect+extra)...ok
mqtt_protocol:954: parse_success_test_ (connect followed by more octets)...ok
...
=======================================================
All 45 tests passed.
Cover analysis: ~/.eunit/index.html
And there is a comprehensive report about the most recent test at ~/.eunit/index.html.
$ make test
Common test is different from unit test in that it lauches test servers to perform black-box and white-box tests. It is more thorough in scope of the tests than the unit test.
All the test history is collected under ~/logs. Open ~/logs/all_runs.html to browse test reports.
TBD
TBD