Skip to content

Commit 1b2bfe5

Browse files
committed
fix: prometheus metrics observation with qoe enabled
fixes #282
1 parent b237748 commit 1b2bfe5

File tree

1 file changed

+31
-32
lines changed

1 file changed

+31
-32
lines changed

src/emqtt_bench.erl

Lines changed: 31 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1670,38 +1670,37 @@ qoe_store_insert(Prometheus,
16701670
, handshaked := HSTs
16711671
, connected := ConnTs
16721672
} = QoE) ->
1673-
ElapsedHandshake = HSTs - StartTs,
1674-
ElapsedConn = ConnTs - StartTs,
1675-
ElapsedSub = case maps:get(subscribed, QoE, undefined) of
1676-
undefined ->
1677-
%% invalid
1678-
?invalid_elapsed;
1679-
SubTs ->
1680-
SubTs - StartTs
1681-
end,
1682-
1683-
ElapsedTCPHS = case maps:get(tcp_connected_at, QoE, undefined) of
1684-
undefined ->
1685-
%% invalid
1686-
?invalid_elapsed;
1687-
TCPTs ->
1688-
TCPTs - StartTs
1689-
end,
1690-
histogram_observe(Prometheus, mqtt_client_tcp_handshake_duration, ElapsedTCPHS),
1691-
histogram_observe(Prometheus, mqtt_client_handshake_duration, ElapsedHandshake),
1692-
histogram_observe(Prometheus, mqtt_client_connect_duration, ElapsedConn),
1693-
histogram_observe(Prometheus, mqtt_client_subscribe_duration, ElapsedSub),
1694-
1695-
Offset = erlang:time_offset(millisecond),
1696-
Term = #qoe_rec_v2{key = {ClientId, Offset + StartTs},
1697-
tcp_lat = ElapsedTCPHS,
1698-
handshake_lat = ElapsedHandshake,
1699-
connect_lat = ElapsedConn,
1700-
subscribe_lat = ElapsedSub
1701-
},
1702-
true = ets:insert(?qoe_store, Term),
1703-
ok.
1704-
1673+
CalcElapsed = fun(EndTs) when is_integer(EndTs) ->
1674+
EndTs - StartTs;
1675+
(_) ->
1676+
?invalid_elapsed
1677+
end,
1678+
1679+
ObserveElapsed = fun(Time, Metric) when is_integer(Time) ->
1680+
histogram_observe(Prometheus, Metric, Time),
1681+
Time;
1682+
(_, _) ->
1683+
?invalid_elapsed
1684+
end,
1685+
1686+
TcpConnectedAtTs = maps:get(tcp_connected_at, QoE, undefined),
1687+
ElapsedTCPHS = ObserveElapsed(CalcElapsed(TcpConnectedAtTs), mqtt_client_tcp_handshake_duration),
1688+
ElapsedHandshake = ObserveElapsed(CalcElapsed(HSTs), mqtt_client_handshake_duration),
1689+
ElapsedConn = ObserveElapsed(CalcElapsed(ConnTs), mqtt_client_connect_duration),
1690+
SubscribedTs = maps:get(subscribed, QoE, undefined),
1691+
ElapsedSub = ObserveElapsed(CalcElapsed(SubscribedTs), mqtt_client_subscribe_duration),
1692+
1693+
Offset = erlang:time_offset(millisecond),
1694+
Term = #qoe_rec_v2{
1695+
key = {ClientId, Offset + StartTs},
1696+
tcp_lat = ElapsedTCPHS,
1697+
handshake_lat = ElapsedHandshake,
1698+
connect_lat = ElapsedConn,
1699+
subscribe_lat = ElapsedSub
1700+
},
1701+
1702+
true = ets:insert(?qoe_store, Term),
1703+
ok.
17051704

17061705
open_qoe_disklog(File) ->
17071706
case disk_log:open([{name, ?QoELog}, {file, File}, {repair, true}, {type, halt},

0 commit comments

Comments
 (0)