Skip to content

Commit a8563f3

Browse files
authored
Better buffering and ack cleanup (#20)
1 parent a92c8e0 commit a8563f3

File tree

1 file changed

+135
-44
lines changed

1 file changed

+135
-44
lines changed

src/mqtt_sessions_process.erl

Lines changed: 135 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
%% @doc Process handling one single MQTT session.
22
%% Transports attaches and detaches from this session.
33
%% @author Marc Worrell <[email protected]>
4-
%% @copyright 2018-2022 Marc Worrell
4+
%% @copyright 2018-2024 Marc Worrell
55

6-
%% Copyright 2018-2022 Marc Worrell
6+
%% Copyright 2018-2024 Marc Worrell
77
%%
88
%% Licensed under the Apache License, Version 2.0 (the "License");
99
%% you may not use this file except in compliance with the License.
@@ -60,8 +60,8 @@
6060
-define(SESSION_EXPIRY_DEFAULT, 3600). % Maximum allowed session expiration
6161
-define(MESSAGE_EXPIRY_DEFAULT, 3600).
6262

63-
-define(MAX_INFLIGHT, 500). % Max in-flight messages for any QoS
64-
-define(MAX_INFLIGHT_ACK, 100). % Max in-flight messages waiting with QoS 1 or 2
63+
-define(MAX_QUEUED, 500). % Max pending messages for any QoS
64+
-define(MAX_INFLIGHT_ACK, 250). % Max in-flight or pending messages waiting with QoS 1 or 2
6565

6666

6767
-define(KILL_TIMEOUT, 5000).
@@ -105,7 +105,7 @@
105105
-record(queued, {
106106
type :: atom(),
107107
msg_nr :: pos_integer(),
108-
packet_id = undefined :: undefined | non_neg_integer(),
108+
packet_id = undefined :: undefined | packet_id(),
109109
queued :: non_neg_integer(),
110110
expiry :: non_neg_integer(),
111111
qos = 0 :: 0..2,
@@ -240,11 +240,21 @@ handle_call({incoming_data, NewData, ConnectionPid}, _From, #state{ incoming_dat
240240
{reply, ok, StateRest#state{ keep_alive_counter = 3, incoming_data = Rest }};
241241
{error, Reason} when is_atom(Reason) ->
242242
% illegal packet, disconnect and wait for new connection
243-
?LOG_INFO("Error decoding incoming data: ~p", [ Reason ]),
243+
?LOG_WARNING(#{
244+
in => mqtt_sessions,
245+
text => <<"Error decoding incoming data - disconnecting">>,
246+
result => error,
247+
reason => Reason
248+
}),
244249
{reply, {error, Reason}, force_disconnect(State)}
245250
end;
246251
handle_call({incoming_data, _NewData, ConnectionPid}, _From, State) ->
247-
?LOG_DEBUG("MQTT session incoming data from ~p, expected from ~p", [ConnectionPid, State#state.connection_pid]),
252+
?LOG_DEBUG(#{
253+
in => mqtt_sessions,
254+
text => <<"MQTT session incoming data from unexpected Pid">>,
255+
from_pid => ConnectionPid,
256+
expected_pid => State#state.connection_pid
257+
}),
248258
{reply, {error, wrong_connection}, State};
249259
handle_call(Cmd, _From, State) ->
250260
{stop, {unknown_cmd, Cmd}, State}.
@@ -303,7 +313,11 @@ handle_info({'DOWN', _Mref, process, Pid, _Reason}, State) ->
303313
{noreply, State1};
304314

305315
handle_info(Info, State) ->
306-
?LOG_INFO("Unknown info message ~p", [Info]),
316+
?LOG_INFO(#{
317+
in => mqtt_sessions,
318+
text => <<"Ignored unknown info message">>,
319+
info_msg => Info
320+
}),
307321
{noreply, State}.
308322

309323
code_change(_Vsn, State, _Extra) ->
@@ -354,13 +368,29 @@ handle_incoming(#{ type := connect } = Msg, Options, State) ->
354368
handle_incoming(#{ type := auth } = Msg, _Options, State) ->
355369
packet_connect_auth(Msg, State);
356370
handle_incoming(#{ type := Type }, _Options, #state{ connection_pid = undefined } = State) ->
357-
?LOG_INFO("Dropping packet for MQTT session ~p ~s (~p) for receiving ~p when not connected.",
358-
[State#state.pool, State#state.client_id, self(), Type]),
371+
?LOG_INFO(#{
372+
in => mqtt_sessions,
373+
text => <<"Dropping packet for MQTT session when not connected.">>,
374+
result => error,
375+
reason => not_connected,
376+
pool => State#state.pool,
377+
client_id => State#state.client_id,
378+
session_pid => self(),
379+
message_type => Type
380+
}),
359381
{error, not_connected};
360382
handle_incoming(#{ type := Type }, _Options, #state{ is_session_present = false } = State) ->
361383
% Only AUTH and CONNECT before the CONNACK
362-
?LOG_INFO("Killing MQTT session ~p ~s (~p) for receiving ~p when no session started.",
363-
[State#state.pool, State#state.client_id, self(), Type]),
384+
?LOG_INFO(#{
385+
in => mqtt_sessions,
386+
text => <<"MQTT received non AUTH or CONNECT before CONNACK - killed session">>,
387+
result => error,
388+
reason => no_connack,
389+
pool => State#state.pool,
390+
client_id => State#state.client_id,
391+
session_pid => self(),
392+
message_type => Type
393+
}),
364394
{stop, State};
365395
handle_incoming(#{ type := publish } = Msg, _Options, State) ->
366396
packet_publish(Msg, State);
@@ -392,7 +422,11 @@ handle_incoming(#{ type := disconnect } = Msg, _Options, State) ->
392422
packet_disconnect(Msg, State);
393423

394424
handle_incoming(#{ type := Type }, _Options, State) ->
395-
?LOG_INFO("MQTT dropping unhandled packet with type ~p", [Type]),
425+
?LOG_INFO(#{
426+
in => mqtt_sessions,
427+
text => <<"MQTT dropping unhandled packet with type">>,
428+
message_type => Type
429+
}),
396430
{ok, State}.
397431

398432
% ---------------------------------------------------------------------------------------
@@ -458,8 +492,8 @@ handle_connect_auth(Msg, Options, StateIfAccept, #state{ runtime = Runtime, is_s
458492
handle_connect_auth_1({ok, #{ type := connack, reason_code := ?MQTT_RC_SUCCESS } = ConnAck, UserContext1},
459493
#{ clean_start := CleanStart }, StateIfAccept, #state{ is_session_present = IsSessionPresent }) ->
460494
StateCleaned = maybe_clean_start(CleanStart, StateIfAccept),
461-
462-
%% Set the session_present flag to true, when the runtime omitted it, and when there is a
495+
496+
%% Set the session_present flag to true, when the runtime omitted it, and when there is a
463497
%% session present.
464498
ConnAck1 = case maps:find(session_present, ConnAck) of
465499
{ok, _} -> ConnAck;
@@ -480,7 +514,14 @@ handle_connect_auth_1({ok, #{ type := connack, reason_code := ?MQTT_RC_SUCCESS }
480514
{ok, State3};
481515
handle_connect_auth_1({ok, #{ type := connack, reason_code := ReasonCode } = ConnAck, _UserContext1}, _Msg, StateIfAccept, _State) ->
482516
_ = reply_connack(ConnAck, StateIfAccept),
483-
?LOG_DEBUG("MQTT connect/auth refused (~p): ~p", [ReasonCode, ConnAck]),
517+
?LOG_INFO(#{
518+
in => mqtt_sessions,
519+
text => <<"MQTT connect/auth refused">>,
520+
result => error,
521+
reason => connection_refused,
522+
reason_code => ReasonCode,
523+
connack => ConnAck
524+
}),
484525
{error, connection_refused};
485526
handle_connect_auth_1({ok, #{ type := auth } = Auth, UserContext1}, _Msg, StateIfAccept, _State) ->
486527
State1 = StateIfAccept#state{
@@ -491,7 +532,14 @@ handle_connect_auth_1({ok, #{ type := auth } = Auth, UserContext1}, _Msg, StateI
491532
State2#state.session_expiry_interval, State2#state.user_context),
492533
{ok, State2};
493534
handle_connect_auth_1({error, Reason}, Msg, _StateIfAccept, _State) ->
494-
?LOG_INFO("MQTT connect/auth refused (~p): ~p", [Reason, Msg]),
535+
?LOG_INFO(#{
536+
in => mqtt_sessions,
537+
text => <<"MQTT connect/auth refused">>,
538+
result => error,
539+
reason => connection_refused,
540+
msg_reason => Reason,
541+
message => Msg
542+
}),
495543
{error, connection_refused}.
496544

497545

@@ -626,8 +674,12 @@ packet_pubrel(#{ packet_id := PacketId, reason_code := ?MQTT_RC_SUCCESS }, #stat
626674
end;
627675
packet_pubrel(#{ packet_id := PacketId, reason_code := RC }, #state{ awaiting_rel = WaitRel } = State) ->
628676
% Error server/client out of sync - remove the wait-rel for this packet_id
629-
?LOG_INFO("PUBREL with reason ~p for packet ~p",
630-
[ RC, PacketId ]),
677+
?LOG_INFO(#{
678+
in => mqtt_sessions,
679+
text => <<"PUBREL with non success reason for packet">>,
680+
reason_code => RC,
681+
packet_id => PacketId
682+
}),
631683
WaitRel1 = maps:remove(PacketId, WaitRel),
632684
{ok, State#state{ awaiting_rel = WaitRel1 }}.
633685

@@ -638,8 +690,14 @@ packet_puback(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = Sta
638690
{ok, {_MsgNr, puback, _Msg}} ->
639691
maps:remove(PacketId, WaitAck);
640692
{ok, {_MsgNr, Wait, Msg}} ->
641-
?LOG_WARNING("PUBACK for message ~p waiting for ~p. Message: ~p",
642-
[ PacketId, Wait, Msg ]),
693+
?LOG_WARNING(#{
694+
in => mqtt_sessions,
695+
text => <<"PUBACK for message wating for something else - dropping pending ack">>,
696+
result => error,
697+
packet_id => PacketId,
698+
wait => Wait,
699+
message => Msg
700+
}),
643701
maps:remove(PacketId, WaitAck);
644702
error ->
645703
WaitAck
@@ -654,8 +712,14 @@ packet_pubrec(#{ packet_id := PacketId, reason_code := RC }, #state{ awaiting_ac
654712
{ok, {_MsgNr, pubcomp, _Msg}} ->
655713
maps:remove(PacketId, WaitAck);
656714
{ok, {_MsgNr, Wait, Msg}} ->
657-
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p",
658-
[ PacketId, Wait, Msg ]),
715+
?LOG_WARNING(#{
716+
in => mqtt_sessions,
717+
text => <<"PUBREC for message wating for something else - dropping pending ack">>,
718+
result => error,
719+
packet_id => PacketId,
720+
wait => Wait,
721+
message => Msg
722+
}),
659723
maps:remove(PacketId, WaitAck);
660724
error ->
661725
WaitAck
@@ -668,8 +732,14 @@ packet_pubrec(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = Sta
668732
{ok, {_MsgNr, pubcomp, _Msg}} ->
669733
{WaitAck, ?MQTT_RC_SUCCESS};
670734
{ok, {_MsgNr, Wait, Msg}} ->
671-
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p",
672-
[ PacketId, Wait, Msg ]),
735+
?LOG_WARNING(#{
736+
in => mqtt_sessions,
737+
text => <<"PUBREC for message wating for something else - dropping pending ack">>,
738+
result => error,
739+
packet_id => PacketId,
740+
wait => Wait,
741+
message => Msg
742+
}),
673743
{maps:remove(PacketId, WaitAck), ?MQTT_RC_PACKET_ID_NOT_FOUND};
674744
error ->
675745
{WaitAck, ?MQTT_RC_PACKET_ID_NOT_FOUND}
@@ -688,8 +758,14 @@ packet_pubcomp(#{ packet_id := PacketId }, #state{ awaiting_ack = WaitAck } = St
688758
{ok, {_MsgNr, pubcomp, _Msg}} ->
689759
maps:remove(PacketId, WaitAck);
690760
{ok, {_MsgNr, Wait, Msg}} ->
691-
?LOG_WARNING("PUBREC for message ~p waiting for ~p. Message: ~p",
692-
[ PacketId, Wait, Msg ]),
761+
?LOG_WARNING(#{
762+
in => mqtt_sessions,
763+
text => <<"PUBCOMP for message wating for something else - dropping pending ack">>,
764+
result => error,
765+
packet_id => PacketId,
766+
wait => Wait,
767+
message => Msg
768+
}),
693769
maps:remove(PacketId, WaitAck);
694770
error ->
695771
WaitAck
@@ -786,11 +862,11 @@ relay_publish(#{ type := publish, message := Msg } = MqttMsg, State) ->
786862
0 ->
787863
reply(Msg2#{ packet_id => 0 }, StatePurged);
788864
_ ->
789-
case maps:size(StatePurged#state.awaiting_ack) > ?MAX_INFLIGHT_ACK of
865+
case maps:size(StatePurged#state.awaiting_ack) >= ?MAX_INFLIGHT_ACK of
790866
true ->
791-
?LOG_DEBUG(#{
867+
?LOG_INFO(#{
792868
in => mqtt_session,
793-
text => <<"Dropping QoS 1/2 message, too many inflight acks">>,
869+
text => <<"Not accepting QoS 1/2 message, too many inflight or queued acks">>,
794870
result => error,
795871
reason => buffer_full
796872
}),
@@ -932,15 +1008,13 @@ disconnect_transport(#state{ transport = Transport } = State) when is_function(T
9321008
reply(undefined, State) ->
9331009
State;
9341010
reply(Msg, #state{ transport = undefined } = State) ->
935-
State1 = maybe_purge(State),
936-
queue(Msg, State1);
1011+
maybe_purge( queue(Msg, State) );
9371012
reply(Msg, State) ->
938-
State1 = maybe_purge(State),
939-
case send_transport(Msg, State1) of
1013+
case send_transport(Msg, State) of
9401014
ok ->
941-
State1;
1015+
State;
9421016
{error, _} ->
943-
queue(Msg, State1#state{ transport = undefined })
1017+
maybe_purge( queue(Msg, State#state{ transport = undefined }) )
9441018
end.
9451019

9461020
send_transport(_Msg, #state{ transport = undefined }) ->
@@ -974,26 +1048,45 @@ queue_1(#{ type := Type } = Msg, #state{ msg_nr = MsgNr, pending = Pending } = S
9741048
msg_nr = MsgNr,
9751049
type = Type,
9761050
queued = Now,
1051+
packet_id = maps:get(packet_id, Msg, 0),
9771052
expiry = Now + maps:get(message_expiry_interval, Props, ?MESSAGE_EXPIRY_DEFAULT),
9781053
qos = maps:get(qos, Msg, 1),
9791054
message = Msg
9801055
},
981-
State#state{ pending = queue:in(Item, Pending)}.
1056+
State#state{ pending = queue:in(Item, Pending) }.
9821057

9831058
maybe_purge(#state{ pending = Queue, awaiting_ack = WaitAcks } = State) ->
984-
case queue:len(Queue) > ?MAX_INFLIGHT orelse maps:size(WaitAcks) > ?MAX_INFLIGHT_ACK of
1059+
case queue:len(Queue) > ?MAX_QUEUED orelse maps:size(WaitAcks) > ?MAX_INFLIGHT_ACK of
9851060
true ->
1061+
PacketIdsBefore = queue:fold(
1062+
fun
1063+
(#queued{ qos = 0 }, Acc) -> Acc;
1064+
(#queued{ packet_id = PacketId }, Acc) -> [ PacketId | Acc ]
1065+
end,
1066+
[],
1067+
Queue),
9861068
PurgedQueue = purge(Queue),
987-
PacketIds = queue:fold(
1069+
PacketIdsAfter = queue:fold(
9881070
fun
9891071
(#queued{ qos = 0 }, Acc) -> Acc;
9901072
(#queued{ packet_id = PacketId }, Acc) -> [ PacketId | Acc ]
9911073
end,
9921074
[],
9931075
PurgedQueue),
1076+
PurgedPacketIds = PacketIdsBefore -- PacketIdsAfter,
1077+
PurgedWaitAcks = maps:without(PurgedPacketIds, WaitAcks),
1078+
?LOG_DEBUG(#{
1079+
in => mqtt_sessions,
1080+
text => <<"Purged pending messages">>,
1081+
result => ok,
1082+
pending_before => queue:len(Queue),
1083+
pending_after => queue:len(PurgedQueue),
1084+
dropped_acks => length(PurgedPacketIds),
1085+
pending_acks => maps:size(PurgedWaitAcks)
1086+
}),
9941087
State#state{
9951088
pending = PurgedQueue,
996-
awaiting_ack = maps:with(PacketIds, WaitAcks)
1089+
awaiting_ack = PurgedWaitAcks
9971090
};
9981091
false ->
9991092
State
@@ -1002,10 +1095,8 @@ maybe_purge(#state{ pending = Queue, awaiting_ack = WaitAcks } = State) ->
10021095
purge(Queue) ->
10031096
{value, #queued{ queued = Oldest }} = queue:peek(Queue),
10041097
{value, #queued{ queued = Newest }} = queue:peek_r(Queue),
1005-
10061098
PurgeTime = mqtt_sessions_timestamp:timestamp(),
1007-
QoS0PurgeAge = (Newest - Oldest) / 2,
1008-
1099+
QoS0PurgeAge = (Newest - Oldest) div 2,
10091100
Queue1 = queue:filter(
10101101
fun
10111102
(#queued{ qos = 0, queued = Queued, expiry = Expiry }) ->
@@ -1014,7 +1105,7 @@ purge(Queue) ->
10141105
PurgeTime < Expiry
10151106
end,
10161107
Queue),
1017-
case queue:len(Queue1) > ?MAX_INFLIGHT of
1108+
case queue:len(Queue1) > ?MAX_QUEUED of
10181109
true ->
10191110
% Drop all QoS 0 messages
10201111
queue:filter(fun(#queued{ qos = QoS }) -> QoS > 0 end, Queue1);

0 commit comments

Comments
 (0)