Skip to content

Commit d5c920a

Browse files
authored
Make reconcile more robust (#320)
* Make reconcile more robust * Track "weird routes" for manual intervention * Fix dialyzer
1 parent fd37c3e commit d5c920a

File tree

5 files changed

+140
-21
lines changed

5 files changed

+140
-21
lines changed

include/hpr_metrics.hrl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-define(METRICS_ROUTES_GAUGE, "hpr_routes_gauge").
1414
-define(METRICS_EUI_PAIRS_GAUGE, "hpr_eui_pairs_gauge").
1515
-define(METRICS_SKFS_GAUGE, "hpr_skfs_gauge").
16+
-define(METRICS_WEIRD_ROUTES_GAUGE, "hpr_weird_routes_gauge").
1617
-define(METRICS_PACKET_REPORT_HISTOGRAM, "hpr_packet_report_histogram").
1718
-define(METRICS_MULTI_BUY_GET_HISTOGRAM, "hpr_multi_buy_get_histogram").
1819
-define(METRICS_FIND_ROUTES_HISTOGRAM, "hpr_find_routes_histogram").
@@ -35,6 +36,7 @@
3536
{?METRICS_ROUTES_GAUGE, prometheus_gauge, [], "Number of Routes"},
3637
{?METRICS_EUI_PAIRS_GAUGE, prometheus_gauge, [], "Number of EUI Pairs"},
3738
{?METRICS_SKFS_GAUGE, prometheus_gauge, [], "Number of SKFs"},
39+
{?METRICS_WEIRD_ROUTES_GAUGE, prometheus_gauge, [], "Number of weird routes"},
3840
{?METRICS_PACKET_REPORT_HISTOGRAM, prometheus_histogram, [status], "Packet Reports"},
3941
{?METRICS_MULTI_BUY_GET_HISTOGRAM, prometheus_histogram, [status], "Multi Buy Service Get"},
4042
{?METRICS_FIND_ROUTES_HISTOGRAM, prometheus_histogram, [], "Find Routes"},

src/cli/hpr_cli_config.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ try_refresh_route(RouteID) ->
369369
end.
370370

371371
config_route_refresh(["config", "route", "refresh", RouteID], [], _Flags) ->
372-
case hpr_route_stream_worker:refresh_route(RouteID) of
372+
case hpr_route_stream_worker:refresh_route(RouteID, 3) of
373373
{ok, RefreshMap} ->
374374
Table = [
375375
[
@@ -858,7 +858,7 @@ sync_routes([Route | Routes], ExistingRoutes, #{added := AddedRoutes} = Updates)
858858
{error, not_found} ->
859859
lager:info([{route_id, RouteID}], "syncing new route"),
860860
ok = hpr_route_storage:insert(Route),
861-
hpr_route_stream_worker:refresh_route(hpr_route:id(Route)),
861+
hpr_route_stream_worker:refresh_route(hpr_route:id(Route), 3),
862862

863863
sync_routes(
864864
Routes,

src/grpc/iot_config/hpr_route_stream_worker.erl

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
-export([
6363
start_link/1,
6464
refresh_route/1,
65+
refresh_route/2,
6566
checkpoint/0,
6667
schedule_checkpoint/0
6768
]).
@@ -148,6 +149,19 @@ start_link(Args) ->
148149
refresh_route(RouteID) ->
149150
gen_server:call(?MODULE, {refresh_route, RouteID}, timer:seconds(300)).
150151

152+
-spec refresh_route(hpr_route:id(), Retry :: non_neg_integer()) ->
153+
{ok, refresh_map()} | {error, any()}.
154+
refresh_route(RouteID, Retry) when Retry == 0 ->
155+
{error, {retry_exhausted, RouteID}};
156+
refresh_route(RouteID, Retry) ->
157+
case ?MODULE:refresh_route(RouteID) of
158+
{ok, _} = Result ->
159+
Result;
160+
{error, Reason} ->
161+
lager:error("refresh_route failed ~p retrying ~p for ~p", [Reason, Retry, RouteID]),
162+
?MODULE:refresh_route(RouteID, Retry - 1)
163+
end.
164+
151165
-spec checkpoint() -> ok.
152166
checkpoint() ->
153167
gen_server:call(?MODULE, checkpoint).
@@ -246,25 +260,36 @@ handle_call({refresh_route, RouteID}, _From, State) ->
246260
Reply =
247261
case {DevaddrResponse, EUIResponse, SKFResponse} of
248262
{{ok, {DBefore, DAfter}}, {ok, {EBefore, EAfter}}, {ok, {SBefore, SAfter}}} ->
249-
{ok, #{
250-
eui_before => length(EBefore),
251-
eui_after => length(EAfter),
252-
eui_removed => length(EBefore -- EAfter),
253-
eui_added => length(EAfter -- EBefore),
254-
%%
255-
skf_before => length(SBefore),
256-
skf_after => length(SAfter),
257-
skf_removed => length(SBefore -- SAfter),
258-
skf_added => length(SAfter -- SBefore),
259-
%%
260-
devaddr_before => length(DBefore),
261-
devaddr_after => length(DAfter),
262-
devaddr_removed => length(DBefore -- DAfter),
263-
devaddr_added => length(DAfter -- DBefore)
264-
}};
265-
{Err, _, _} when element(1, Err) == error -> Err;
266-
{_, Err, _} when element(1, Err) == error -> Err;
267-
{_, _, Err} when element(1, Err) == error -> Err;
263+
LenSAfter = length(SAfter),
264+
LenDAfter = length(DAfter),
265+
% When we have some SKF (SAfter > 0) but no Devaddrs we consider it an error
266+
case LenSAfter > 0 andalso LenDAfter == 0 of
267+
true ->
268+
{error, devaddr_empty};
269+
false ->
270+
{ok, #{
271+
eui_before => length(EBefore),
272+
eui_after => length(EAfter),
273+
eui_removed => length(EBefore -- EAfter),
274+
eui_added => length(EAfter -- EBefore),
275+
%%
276+
skf_before => length(SBefore),
277+
skf_after => LenSAfter,
278+
skf_removed => length(SBefore -- SAfter),
279+
skf_added => length(SAfter -- SBefore),
280+
%%
281+
devaddr_before => length(DBefore),
282+
devaddr_after => LenDAfter,
283+
devaddr_removed => length(DBefore -- DAfter),
284+
devaddr_added => length(DAfter -- DBefore)
285+
}}
286+
end;
287+
{{error, _} = Err, _, _} ->
288+
Err;
289+
{_, {error, _} = Err, _} ->
290+
Err;
291+
{_, _, {error, _} = Err} ->
292+
Err;
268293
Other ->
269294
{error, {unexpected_response, Other}}
270295
end,
@@ -509,6 +534,7 @@ refresh_devaddrs(RouteID) ->
509534
],
510535
{ok, {PreviousDevaddrs, Devaddrs1}};
511536
Err ->
537+
lager:error([{route_id, RouteID}], "failed to refresh route devaddrs ~p", [Err]),
512538
Err
513539
end;
514540
{error, _E} = Err ->

src/metrics/hpr_metrics.erl

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ handle_info(?METRICS_TICK, State) ->
187187
fun record_routes/0,
188188
fun record_eui_pairs/0,
189189
fun record_skfs/0,
190+
fun record_weird_routes/0,
190191
fun record_ets/0,
191192
fun record_queues/0,
192193
fun record_devices/0
@@ -278,6 +279,36 @@ record_skfs() ->
278279
_ = prometheus_gauge:set(?METRICS_SKFS_GAUGE, [], Count),
279280
ok.
280281

282+
-spec record_weird_routes() -> ok.
283+
record_weird_routes() ->
284+
Count = lists:foldl(
285+
fun(RouteETS, Acc) ->
286+
Route = hpr_route_ets:route(RouteETS),
287+
RouteID = hpr_route:id(Route),
288+
SKFCount =
289+
case ets:info(hpr_route_ets:skf_ets(RouteETS), size) of
290+
undefined -> 0;
291+
N -> N
292+
end,
293+
DevAddrRangesCount = hpr_devaddr_range_storage:count_for_route(RouteID),
294+
case SKFCount > 0 andalso DevAddrRangesCount == 0 of
295+
true ->
296+
lager:critical(
297+
[{route_id, RouteID}, {oui, hpr_route:oui(Route)}],
298+
"route has no devaddr ranges but has (~p) skfs",
299+
[SKFCount]
300+
),
301+
Acc + 1;
302+
false ->
303+
Acc
304+
end
305+
end,
306+
0,
307+
ets:tab2list(hpr_routes_ets)
308+
),
309+
_ = prometheus_gauge:set(?METRICS_WEIRD_ROUTES_GAUGE, [], Count),
310+
ok.
311+
281312
-spec record_grpc_connections() -> ok.
282313
record_grpc_connections() ->
283314
Opts = application:get_env(grpcbox, listen_opts, #{}),

test/hpr_route_stream_worker_SUITE.erl

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
-export([
1414
main_test/1,
1515
refresh_route_test/1,
16+
refresh_route_with_retry_test/1,
1617
stream_crash_resume_updates_test/1,
1718
reset_stream_test/1,
1819
reset_channel_test/1,
@@ -34,6 +35,7 @@ all() ->
3435
[
3536
main_test,
3637
refresh_route_test,
38+
refresh_route_with_retry_test,
3739
stream_crash_resume_updates_test,
3840
reset_stream_test,
3941
reset_channel_test,
@@ -691,6 +693,64 @@ refresh_route_test(_Config) ->
691693

692694
ok.
693695

696+
refresh_route_with_retry_test(_Config) ->
697+
%% Create route and send them from server
698+
Route1ID = "7d502f32-4d58-4746-965e-002",
699+
Route1 = hpr_route:test_new(#{
700+
id => Route1ID,
701+
net_id => 0,
702+
oui => 1,
703+
server => #{
704+
host => "localhost",
705+
port => 8080,
706+
protocol => {packet_router, #{}}
707+
},
708+
max_copies => 10
709+
}),
710+
EUIPair1 = hpr_eui_pair:test_new(#{route_id => Route1ID, app_eui => 1, dev_eui => 0}),
711+
DevAddr1 = 16#00000001,
712+
SessionKey1 = hpr_utils:bin_to_hex_string(crypto:strong_rand_bytes(16)),
713+
SessionKeyFilter1 = hpr_skf:new(#{
714+
route_id => Route1ID, devaddr => DevAddr1, session_key => SessionKey1, max_copies => 1
715+
}),
716+
717+
% Send config via GRPC
718+
ok = hpr_test_ics_route_service:stream_resp(
719+
hpr_route_stream_res:test_new(#{action => add, data => {route, Route1}})
720+
),
721+
ok = hpr_test_ics_route_service:stream_resp(
722+
hpr_route_stream_res:test_new(#{action => add, data => {eui_pair, EUIPair1}})
723+
),
724+
ok = hpr_test_ics_route_service:stream_resp(
725+
hpr_route_stream_res:test_new(#{action => add, data => {skf, SessionKeyFilter1}})
726+
),
727+
728+
ok = check_config_counts(Route1ID, 1, 1, 0, 1),
729+
730+
% Make sure GRPC API returns them
731+
application:set_env(hpr, test_route_get_euis, [EUIPair1]),
732+
application:set_env(hpr, test_route_list_skfs, [SessionKeyFilter1]),
733+
734+
?assertEqual(
735+
{error, devaddr_empty},
736+
hpr_route_stream_worker:refresh_route(Route1ID)
737+
),
738+
739+
?assertEqual(
740+
{error, {retry_exhausted, Route1ID}},
741+
hpr_route_stream_worker:refresh_route(Route1ID, 3)
742+
),
743+
744+
% Add a DevAddrRange
745+
DevAddrRange1 = hpr_devaddr_range:test_new(#{
746+
route_id => Route1ID, start_addr => 16#00000001, end_addr => 16#0000000A
747+
}),
748+
application:set_env(hpr, test_route_get_devaddr_ranges, [DevAddrRange1]),
749+
{ok, _} = hpr_route_stream_worker:refresh_route(Route1ID),
750+
751+
ok = check_config_counts(Route1ID, 1, 1, 1, 1),
752+
ok.
753+
694754
%% ===================================================================
695755
%% Helpers
696756
%% ===================================================================

0 commit comments

Comments
 (0)