Skip to content

Commit

Permalink
Implement packet acks for grpc sessions (#279)
Browse files Browse the repository at this point in the history
* Implement packet acks for grpc sessions

* Clear last phash after sending

* Fix xref and eunit

* Fix ack sending not to close stream

* Put proto back to master

---------

Co-authored-by: Macpie <[email protected]>
  • Loading branch information
Vagabond and macpie authored Dec 13, 2023
1 parent 4cc0f92 commit b06b275
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 7 deletions.
2 changes: 1 addition & 1 deletion rebar.lock
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
{<<"hackney">>,{pkg,<<"hackney">>,<<"1.17.0">>},0},
{<<"helium_proto">>,
{git,"https://github.com/helium/proto.git",
{ref,"8d81732f55aa34442ada818c93f504f1a16ca659"}},
{ref,"da1c36915c46a34644e017d6779a2d56065f6cd2"}},
0},
{<<"hpack">>,{pkg,<<"hpack_erl">>,<<"0.3.0">>},2},
{<<"http2_client">>,
Expand Down
7 changes: 5 additions & 2 deletions src/grpc/packet_router/hpr_envelope_down.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@

-export_type([envelope/0]).

-spec new(hpr_packet_down:packet() | hpr_session_offer:offer() | undefined) -> envelope().
-spec new(hpr_packet_down:packet() | hpr_session_offer:offer() | hpr_packet_ack:ack() | undefined) ->
envelope().
new(undefined) ->
#envelope_down_v1_pb{data = undefined};
new(#packet_router_packet_down_v1_pb{} = Packet) ->
#envelope_down_v1_pb{data = {packet, Packet}};
new(#packet_router_session_offer_v1_pb{} = SessionOffer) ->
#envelope_down_v1_pb{data = {session_offer, SessionOffer}}.
#envelope_down_v1_pb{data = {session_offer, SessionOffer}};
new(#packet_router_packet_ack_v1_pb{} = Ack) ->
#envelope_down_v1_pb{data = {packet_ack, Ack}}.

-spec data(Env :: envelope()) ->
{packet, hpr_packet_down:packet()}
Expand Down
29 changes: 29 additions & 0 deletions src/grpc/packet_router/hpr_packet_ack.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-module(hpr_packet_ack).

-include("../autogen/packet_router_pb.hrl").

-export([
new/1
]).

-type packet_ack() :: #packet_router_packet_ack_v1_pb{}.

-export_type([packet_ack/0]).

-spec new(PHash :: binary()) -> packet_ack().
new(PHash) ->
#packet_router_packet_ack_v1_pb{payload_hash = PHash}.

%% ------------------------------------------------------------------
%% EUnit tests
%% ------------------------------------------------------------------
-ifdef(TEST).

-include_lib("eunit/include/eunit.hrl").

new_test() ->
PHash = <<"packet_hash">>,
?assertEqual(#packet_router_packet_ack_v1_pb{payload_hash = PHash}, new(PHash)),
ok.

-endif.
28 changes: 25 additions & 3 deletions src/grpc/packet_router/hpr_packet_router_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
started :: non_neg_integer(),
pubkey_bin :: undefined | binary(),
nonce :: undefined | binary(),
session_key :: undefined | binary()
session_key :: undefined | binary(),
last_phash = <<>> :: binary()
}).

-spec init(atom(), grpcbox_stream:t()) -> grpcbox_stream:t().
Expand Down Expand Up @@ -68,6 +69,17 @@ handle_info({give_away, NewPid, PubKeyBin}, StreamState) ->
lager:info("give_away registration to ~p", [NewPid]),
gproc:give_away({n, l, ?REG_KEY(PubKeyBin)}, NewPid),
grpcbox_stream:send(true, hpr_envelope_down:new(undefined), StreamState);
handle_info({ack, N}, StreamState) ->
HandlerState = grpcbox_stream:stream_handler_state(StreamState),
erlang:send_after(timer:seconds(N), self(), {ack, N}),
grpcbox_stream:send(
false,
hpr_envelope_down:new(hpr_packet_ack:new(HandlerState#handler_state.last_phash)),
grpcbox_stream:stream_handler_state(
StreamState,
HandlerState#handler_state{last_phash = <<>>}
)
);
handle_info(?SESSION_KILL, StreamState0) ->
lager:debug("received session kill for stream"),
grpcbox_stream:send(true, hpr_envelope_down:new(undefined), StreamState0);
Expand Down Expand Up @@ -129,7 +141,11 @@ handle_packet(PacketUp, StreamState) ->
stream_pid => self()
},
_ = erlang:spawn_opt(hpr_routing, handle_packet, [PacketUp, Opts], [{fullsweep_after, 0}]),
{ok, StreamState}.
{ok,
grpcbox_stream:stream_handler_state(
StreamState,
HandlerState#handler_state{last_phash = hpr_packet_up:phash(PacketUp)}
)}.

-spec handle_register(Reg :: hpr_register:register(), StreamState0 :: grpcbox_stream:t()) ->
{ok, grpcbox_stream:t()}
Expand All @@ -154,6 +170,12 @@ handle_register(Reg, StreamState0) ->
StreamState1 = grpcbox_stream:stream_handler_state(
StreamState0, HandlerState#handler_state{pubkey_bin = PubKeyBin}
),
case hpr_register:packet_ack_interval(Reg) of
N when is_integer(N), N > 0 ->
erlang:send_after(timer:seconds(N), self(), {ack, N});
_ ->
ok
end,
case hpr_register:session_capable(Reg) of
true ->
{EnvDown, StreamState2} = create_session_offer(StreamState1),
Expand Down Expand Up @@ -257,7 +279,7 @@ route_packet_test() ->
meck:expect(hpr_routing, handle_packet, fun(_PacketUp, _Opts) -> ok end),

StreamState = grpcbox_stream:stream_handler_state(
#state{}, #handler_state{}
#state{}, #handler_state{last_phash = hpr_packet_up:phash(PacketUp)}
),

?assertEqual({ok, StreamState}, ?MODULE:route(EnvUp, StreamState)),
Expand Down
8 changes: 7 additions & 1 deletion src/grpc/packet_router/hpr_register.erl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
gateway/1,
signature/1,
session_capable/1,
packet_ack_interval/1,
verify/1
]).

Expand Down Expand Up @@ -39,6 +40,10 @@ signature(Reg) ->
session_capable(Reg) ->
Reg#packet_router_register_v1_pb.session_capable.

-spec packet_ack_interval(Reg :: register()) -> non_neg_integer().
packet_ack_interval(Reg) ->
Reg#packet_router_register_v1_pb.packet_ack_interval.

-spec verify(Reg :: register()) -> boolean().
verify(Reg) ->
try
Expand Down Expand Up @@ -70,7 +75,8 @@ test_new(Map) when is_map(Map) ->
#packet_router_register_v1_pb{
timestamp = erlang:system_time(millisecond),
gateway = maps:get(gateway, Map),
session_capable = maps:get(session_capable, Map)
session_capable = maps:get(session_capable, Map),
packet_ack_interval = maps:get(packet_ack_interval, Map, 0)
}.

-spec sign(Reg :: register(), SigFun :: fun()) -> register().
Expand Down

0 comments on commit b06b275

Please sign in to comment.