Skip to content

Commit 7707787

Browse files
authored
Fix a problem where the session could persist after disconnect (#26)
* Fix a problem where the session could persist after disconnect * Ensure that will is published on missing connection
1 parent 056c2e0 commit 7707787

File tree

2 files changed

+37
-13
lines changed

2 files changed

+37
-13
lines changed

src/mqtt_sessions_process.erl

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
%% @author Marc Worrell <[email protected]>
2-
%% @copyright 2018-2024 Marc Worrell
2+
%% @copyright 2018-2025 Marc Worrell
33
%% @doc Process handling one single MQTT session.
44
%% MQTT connections attach and detach from this session. Buffers outgoing
55
%% messages if there is not connection attached.
66
%% @end
77

8-
%% Copyright 2018-2024 Marc Worrell
8+
%% Copyright 2018-2025 Marc Worrell
99
%%
1010
%% Licensed under the Apache License, Version 2.0 (the "License");
1111
%% you may not use this file except in compliance with the License.
@@ -42,6 +42,7 @@
4242
update_user_context/2,
4343

4444
get_transport/1,
45+
is_connected/1,
4546
kill/1,
4647
incoming_connect/3,
4748
incoming_data/2,
@@ -168,6 +169,15 @@ get_transport(Pid) ->
168169
{error, noproc}
169170
end.
170171

172+
-spec is_connected( pid() ) -> boolean().
173+
is_connected(Pid) ->
174+
try
175+
gen_server:call(Pid, is_connected, infinity)
176+
catch
177+
exit:{noproc, _} ->
178+
false
179+
end.
180+
171181
-spec kill( pid() ) -> ok.
172182
kill(Pid) when is_pid(Pid) ->
173183
MRef = monitor(process, Pid),
@@ -237,6 +247,9 @@ handle_call(get_transport, _From, #state{ transport = undefined } = State) ->
237247
handle_call(get_transport, _From, #state{ transport = Transport } = State) ->
238248
{reply, {ok, Transport}, State};
239249

250+
handle_call(is_connected, _From, #state{ is_connected = IsConnected } = State) ->
251+
{reply, IsConnected, State};
252+
240253
handle_call({incoming_data, NewData, ConnectionPid}, _From, #state{ incoming_data = Data, connection_pid = ConnectionPid } = State) ->
241254
Data1 = << Data/binary, NewData/binary >>,
242255
case handle_incoming_data(Data1, State) of
@@ -299,7 +312,7 @@ handle_info({publish_job, JobPid}, #state{ publish_jobs = Jobs } = State) when i
299312
{noreply, State1};
300313

301314
handle_info({'DOWN', _Mref, process, Pid, _Reason}, #state{ connection_pid = Pid } = State) ->
302-
State1 = do_disconnected(State),
315+
State1 = cleanup_state_disconnected(State),
303316
{noreply, State1};
304317
handle_info({'DOWN', _Mref, process, Pid, _Reason}, #state{ will_pid = Pid } = State) ->
305318
send_transport(#{
@@ -1002,12 +1015,9 @@ mark_packet_sent(PacketId, #state{ awaiting_ack = AwaitAck } = State) ->
10021015

10031016

10041017
%% @doc Called when the connection disconnects or crashes/stops
1005-
do_disconnected(#state{ will_pid = WillPid } = State) ->
1006-
mqtt_sessions_will:disconnected(WillPid),
1007-
cleanup_state_disconnected(State).
1008-
10091018
%% @todo Cleanup pending messages and awaiting states.
1010-
cleanup_state_disconnected(State) ->
1019+
cleanup_state_disconnected(#state{ will_pid = WillPid } = State) ->
1020+
mqtt_sessions_will:disconnected(WillPid),
10111021
delete_buffered_qos0(State#state{
10121022
connection_pid = undefined,
10131023
transport = undefined,
@@ -1056,7 +1066,7 @@ extract_will(#{ type := connect, will_flag := true, properties := Props } = Msg)
10561066

10571067
force_disconnect(#state{ connection_pid = undefined, transport = undefined } = State) ->
10581068
State;
1059-
force_disconnect(State) ->
1069+
force_disconnect(#state{ will_pid = WillPid } = State) ->
10601070
State1 = disconnect_transport(State),
10611071
if
10621072
is_pid(State#state.connection_pid) ->

src/mqtt_sessions_will.erl

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,18 @@
4646
user_context :: term(),
4747
session_expiry_interval :: non_neg_integer(),
4848
expiry_ref = undefined :: reference() | undefined,
49+
interval_timer_ref = undefined,
4950
timer_ref = undefined,
5051
is_stopping :: boolean()
5152
}).
5253

5354
%% The connect handshake must complete in 20 seconds.
5455
-define(CONNECT_EXPIRY_INTERVAL, 20).
5556

57+
%% Every minute we do a check with the session to see if it is connected.
58+
%% This is to catch any missed disconnects.
59+
-define(CONNECTED_CHECK_INTERVAL, 60).
60+
5661

5762
-spec start_link( atom(), pid() ) -> {ok, pid()}.
5863
start_link(Pool, SessionPid) ->
@@ -99,9 +104,11 @@ stop(Pid) ->
99104

100105
init([ Pool, SessionPid ]) ->
101106
erlang:monitor(process, SessionPid),
107+
{ok, Timer} = timer:send_interval(?CONNECTED_CHECK_INTERVAL * 1000, check_session_connected),
102108
State = #state{
103109
pool = Pool,
104110
session_pid = SessionPid,
111+
interval_timer_ref = Timer,
105112
will = #{},
106113
is_stopping = false,
107114
session_expiry_interval = 0
@@ -136,8 +143,10 @@ handle_cast(reconnected, State) ->
136143

137144
handle_cast({disconnected, IsWill, ExpiryInterval}, State) ->
138145
{noreply, do_disconnected(State, IsWill, ExpiryInterval)};
139-
handle_cast(disconnected, State) ->
146+
handle_cast(disconnected, #state{ timer_ref = undefined } = State) ->
140147
{noreply, do_disconnected(State, true, undefined)};
148+
handle_cast(disconnected, State) ->
149+
{noreply, State};
141150

142151
handle_cast({user_context, UserContext}, State) ->
143152
{noreply, State#state{ user_context = UserContext }};
@@ -158,11 +167,16 @@ handle_info({expired, Ref}, #state{ expiry_ref = Ref } = State) ->
158167
mqtt_sessions_process:kill(State#state.session_pid),
159168
do_publish_will(State),
160169
{stop, shutdown, State};
161-
handle_info({expired, Ref}, #state{ expiry_ref = Ref } = State) ->
162-
do_publish_will(State),
163-
{stop, shutdown, State};
164170
handle_info({expired, _Ref}, State) ->
165171
% old timer - ignore
172+
{noreply, State};
173+
handle_info(check_session_connected, #state{ session_pid = Pid, timer_ref = undefined } = State) ->
174+
State1 = case mqtt_sessions_process:is_connected(Pid) of
175+
true -> State;
176+
false -> do_disconnected(State, true, undefined)
177+
end,
178+
{noreply, State1};
179+
handle_info(check_session_connected, #state{} = State) ->
166180
{noreply, State}.
167181

168182
code_change(_Vsn, State, _Extra) ->

0 commit comments

Comments
 (0)