From b06b2758f443c04079e74daf809a156f281162bd Mon Sep 17 00:00:00 2001 From: Andrew Thompson Date: Wed, 13 Dec 2023 15:47:24 -0800 Subject: [PATCH] Implement packet acks for grpc sessions (#279) * Implement packet acks for grpc sessions * Clear last phash after sending * Fix xref and eunit * Fix ack sending not to close stream * Put proto back to master --------- Co-authored-by: Macpie --- rebar.lock | 2 +- src/grpc/packet_router/hpr_envelope_down.erl | 7 +++-- src/grpc/packet_router/hpr_packet_ack.erl | 29 +++++++++++++++++++ .../hpr_packet_router_service.erl | 28 ++++++++++++++++-- src/grpc/packet_router/hpr_register.erl | 8 ++++- 5 files changed, 67 insertions(+), 7 deletions(-) create mode 100644 src/grpc/packet_router/hpr_packet_ack.erl diff --git a/rebar.lock b/rebar.lock index 581cd3f0..be5140c9 100644 --- a/rebar.lock +++ b/rebar.lock @@ -63,7 +63,7 @@ {<<"hackney">>,{pkg,<<"hackney">>,<<"1.17.0">>},0}, {<<"helium_proto">>, {git,"https://github.com/helium/proto.git", - {ref,"8d81732f55aa34442ada818c93f504f1a16ca659"}}, + {ref,"da1c36915c46a34644e017d6779a2d56065f6cd2"}}, 0}, {<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},2}, {<<"http2_client">>, diff --git a/src/grpc/packet_router/hpr_envelope_down.erl b/src/grpc/packet_router/hpr_envelope_down.erl index 14ac1592..18966868 100644 --- a/src/grpc/packet_router/hpr_envelope_down.erl +++ b/src/grpc/packet_router/hpr_envelope_down.erl @@ -11,13 +11,16 @@ -export_type([envelope/0]). --spec new(hpr_packet_down:packet() | hpr_session_offer:offer() | undefined) -> envelope(). +-spec new(hpr_packet_down:packet() | hpr_session_offer:offer() | hpr_packet_ack:ack() | undefined) -> + envelope(). new(undefined) -> #envelope_down_v1_pb{data = undefined}; new(#packet_router_packet_down_v1_pb{} = Packet) -> #envelope_down_v1_pb{data = {packet, Packet}}; new(#packet_router_session_offer_v1_pb{} = SessionOffer) -> - #envelope_down_v1_pb{data = {session_offer, SessionOffer}}. + #envelope_down_v1_pb{data = {session_offer, SessionOffer}}; +new(#packet_router_packet_ack_v1_pb{} = Ack) -> + #envelope_down_v1_pb{data = {packet_ack, Ack}}. -spec data(Env :: envelope()) -> {packet, hpr_packet_down:packet()} diff --git a/src/grpc/packet_router/hpr_packet_ack.erl b/src/grpc/packet_router/hpr_packet_ack.erl new file mode 100644 index 00000000..b859a62a --- /dev/null +++ b/src/grpc/packet_router/hpr_packet_ack.erl @@ -0,0 +1,29 @@ +-module(hpr_packet_ack). + +-include("../autogen/packet_router_pb.hrl"). + +-export([ + new/1 +]). + +-type packet_ack() :: #packet_router_packet_ack_v1_pb{}. + +-export_type([packet_ack/0]). + +-spec new(PHash :: binary()) -> packet_ack(). +new(PHash) -> + #packet_router_packet_ack_v1_pb{payload_hash = PHash}. + +%% ------------------------------------------------------------------ +%% EUnit tests +%% ------------------------------------------------------------------ +-ifdef(TEST). + +-include_lib("eunit/include/eunit.hrl"). + +new_test() -> + PHash = <<"packet_hash">>, + ?assertEqual(#packet_router_packet_ack_v1_pb{payload_hash = PHash}, new(PHash)), + ok. + +-endif. diff --git a/src/grpc/packet_router/hpr_packet_router_service.erl b/src/grpc/packet_router/hpr_packet_router_service.erl index 9f6751cd..0005f53f 100644 --- a/src/grpc/packet_router/hpr_packet_router_service.erl +++ b/src/grpc/packet_router/hpr_packet_router_service.erl @@ -22,7 +22,8 @@ started :: non_neg_integer(), pubkey_bin :: undefined | binary(), nonce :: undefined | binary(), - session_key :: undefined | binary() + session_key :: undefined | binary(), + last_phash = <<>> :: binary() }). -spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t(). @@ -68,6 +69,17 @@ handle_info({give_away, NewPid, PubKeyBin}, StreamState) -> lager:info("give_away registration to ~p", [NewPid]), gproc:give_away({n, l, ?REG_KEY(PubKeyBin)}, NewPid), grpcbox_stream:send(true, hpr_envelope_down:new(undefined), StreamState); +handle_info({ack, N}, StreamState) -> + HandlerState = grpcbox_stream:stream_handler_state(StreamState), + erlang:send_after(timer:seconds(N), self(), {ack, N}), + grpcbox_stream:send( + false, + hpr_envelope_down:new(hpr_packet_ack:new(HandlerState#handler_state.last_phash)), + grpcbox_stream:stream_handler_state( + StreamState, + HandlerState#handler_state{last_phash = <<>>} + ) + ); handle_info(?SESSION_KILL, StreamState0) -> lager:debug("received session kill for stream"), grpcbox_stream:send(true, hpr_envelope_down:new(undefined), StreamState0); @@ -129,7 +141,11 @@ handle_packet(PacketUp, StreamState) -> stream_pid => self() }, _ = erlang:spawn_opt(hpr_routing, handle_packet, [PacketUp, Opts], [{fullsweep_after, 0}]), - {ok, StreamState}. + {ok, + grpcbox_stream:stream_handler_state( + StreamState, + HandlerState#handler_state{last_phash = hpr_packet_up:phash(PacketUp)} + )}. -spec handle_register(Reg :: hpr_register:register(), StreamState0 :: grpcbox_stream:t()) -> {ok, grpcbox_stream:t()} @@ -154,6 +170,12 @@ handle_register(Reg, StreamState0) -> StreamState1 = grpcbox_stream:stream_handler_state( StreamState0, HandlerState#handler_state{pubkey_bin = PubKeyBin} ), + case hpr_register:packet_ack_interval(Reg) of + N when is_integer(N), N > 0 -> + erlang:send_after(timer:seconds(N), self(), {ack, N}); + _ -> + ok + end, case hpr_register:session_capable(Reg) of true -> {EnvDown, StreamState2} = create_session_offer(StreamState1), @@ -257,7 +279,7 @@ route_packet_test() -> meck:expect(hpr_routing, handle_packet, fun(_PacketUp, _Opts) -> ok end), StreamState = grpcbox_stream:stream_handler_state( - #state{}, #handler_state{} + #state{}, #handler_state{last_phash = hpr_packet_up:phash(PacketUp)} ), ?assertEqual({ok, StreamState}, ?MODULE:route(EnvUp, StreamState)), diff --git a/src/grpc/packet_router/hpr_register.erl b/src/grpc/packet_router/hpr_register.erl index 1a7ab740..bc03c087 100644 --- a/src/grpc/packet_router/hpr_register.erl +++ b/src/grpc/packet_router/hpr_register.erl @@ -7,6 +7,7 @@ gateway/1, signature/1, session_capable/1, + packet_ack_interval/1, verify/1 ]). @@ -39,6 +40,10 @@ signature(Reg) -> session_capable(Reg) -> Reg#packet_router_register_v1_pb.session_capable. +-spec packet_ack_interval(Reg :: register()) -> non_neg_integer(). +packet_ack_interval(Reg) -> + Reg#packet_router_register_v1_pb.packet_ack_interval. + -spec verify(Reg :: register()) -> boolean(). verify(Reg) -> try @@ -70,7 +75,8 @@ test_new(Map) when is_map(Map) -> #packet_router_register_v1_pb{ timestamp = erlang:system_time(millisecond), gateway = maps:get(gateway, Map), - session_capable = maps:get(session_capable, Map) + session_capable = maps:get(session_capable, Map), + packet_ack_interval = maps:get(packet_ack_interval, Map, 0) }. -spec sign(Reg :: register(), SigFun :: fun()) -> register().