Skip to content

Commit b6332bb

Browse files
authored
Make it possible to have a MFA transport (#17)
1 parent a8563f3 commit b6332bb

File tree

2 files changed

+14
-8
lines changed

2 files changed

+14
-8
lines changed

src/mqtt_sessions.erl

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@
7676

7777
-type session_ref() :: pid() | binary().
7878
-type msg_options() :: #{
79-
transport => pid() | function(),
79+
transport => transport(),
8080
peer_ip => tuple() | undefined,
8181
context_prefs => map(),
8282
connection_pid => pid()
@@ -94,6 +94,8 @@
9494

9595
-type callback() :: pid() | {module(), atom(), list()}.
9696

97+
-type transport() :: function() | callback().
98+
9799
-export_type([
98100
session_ref/0,
99101
msg_options/0,
@@ -102,7 +104,8 @@
102104
subscriber/0,
103105
subscriber_options/0,
104106
topic/0,
105-
callback/0
107+
callback/0,
108+
transport/0
106109
]).
107110

108111
-define(SIDEJOBS_PER_SESSION, 20).
@@ -188,7 +191,7 @@ update_user_context(Pool, ClientId, Fun) ->
188191
{error, _} = Error -> Error
189192
end.
190193

191-
-spec get_transport( pid() ) -> {ok, pid()} | {error, notransport | noproc}.
194+
-spec get_transport( pid() ) -> {ok, transport()} | {error, notransport | noproc}.
192195
get_transport(SessionPid) ->
193196
mqtt_sessions_process:get_transport(SessionPid).
194197

src/mqtt_sessions_process.erl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,14 +68,15 @@
6868

6969
-type packet_id() :: 0..65535. % ?MAX_PACKET_ID
7070

71+
7172
-record(state, {
7273
protocol_version :: mqtt_packet_map:mqtt_version(),
7374
pool :: atom(),
7475
runtime :: atom(),
7576
client_id :: binary(),
7677
routing_id :: binary(),
7778
user_context :: term(),
78-
transport = undefined :: pid() | function() | undefined,
79+
transport = undefined :: mqtt_sessions:transport() | undefined,
7980
connection_pid = undefined :: pid() | undefined,
8081
is_session_present = false :: boolean(),
8182
pending_connack = undefined :: term(),
@@ -145,7 +146,7 @@ update_user_context(Pid, Fun) ->
145146
{error, noproc}
146147
end.
147148

148-
-spec get_transport( pid() ) -> {ok, pid()} | {error, notransport | noproc}.
149+
-spec get_transport( pid() ) -> {ok, mqtt_sessions:transport()} | {error, notransport | noproc}.
149150
get_transport(Pid) ->
150151
try
151152
gen_server:call(Pid, get_transport, infinity)
@@ -230,8 +231,8 @@ handle_call({update_user_context, Fun}, _From, #state{ user_context = UserContex
230231

231232
handle_call(get_transport, _From, #state{ transport = undefined } = State) ->
232233
{reply, {error, notransport}, State};
233-
handle_call(get_transport, _From, #state{ transport = TransportPid } = State) ->
234-
{reply, {ok, TransportPid}, State};
234+
handle_call(get_transport, _From, #state{ transport = Transport } = State) ->
235+
{reply, {ok, Transport}, State};
235236

236237
handle_call({incoming_data, NewData, ConnectionPid}, _From, #state{ incoming_data = Data, connection_pid = ConnectionPid } = State) ->
237238
Data1 = << Data/binary, NewData/binary >>,
@@ -1030,7 +1031,9 @@ send_transport(Msg, #state{ transport = Pid }) when is_pid(Pid) ->
10301031
ok
10311032
end;
10321033
send_transport(Msg, #state{ transport = Fun }) when is_function(Fun) ->
1033-
Fun(Msg).
1034+
Fun(Msg);
1035+
send_transport(Msg, #state{ transport = {M, F, A} }) ->
1036+
erlang:apply(M, F, [Msg | A]).
10341037

10351038

10361039
%% @doc Queue a message, extract, type, message expiry, and QoS

0 commit comments

Comments
 (0)