Skip to content

Commit

Permalink
add trailers to active_stream record to send when body is done (#123)
Browse files Browse the repository at this point in the history
  • Loading branch information
tsloughter authored Feb 5, 2018
1 parent a787f4c commit b51756f
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 3 deletions.
72 changes: 72 additions & 0 deletions src/h2_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
-export([
send_headers/3,
send_headers/4,
send_trailers/3,
send_trailers/4,
send_body/3,
send_body/4,
send_request/3,
Expand Down Expand Up @@ -245,6 +247,16 @@ send_headers(Pid, StreamId, Headers, Opts) ->
gen_statem:cast(Pid, {send_headers, StreamId, Headers, Opts}),
ok.

-spec send_trailers(pid(), stream_id(), hpack:headers()) -> ok.
send_trailers(Pid, StreamId, Trailers) ->
gen_statem:cast(Pid, {send_trailers, StreamId, Trailers, []}),
ok.

-spec send_trailers(pid(), stream_id(), hpack:headers(), send_opts()) -> ok.
send_trailers(Pid, StreamId, Trailers, Opts) ->
gen_statem:cast(Pid, {send_trailers, StreamId, Trailers, Opts}),
ok.

-spec send_body(pid(), stream_id(), binary()) -> ok.
send_body(Pid, StreamId, Body) ->
gen_statem:cast(Pid, {send_body, StreamId, Body, []}),
Expand Down Expand Up @@ -1011,7 +1023,52 @@ handle_event(_, {send_headers, StreamId, Headers, Opts},
closed ->
{keep_state, Conn}
end;
handle_event(_, {send_trailers, StreamId, Headers, _Opts},
#connection{
encode_context=EncodeContext,
streams = Streams,
socket = _Socket
}=Conn
) ->
lager:debug("[~p] {send trailers, ~p, ~p}",
[Conn#connection.type, StreamId, Headers]),
%% StreamComplete = proplists:get_value(send_end_stream, Opts, false),

Stream = h2_stream_set:get(StreamId, Streams),
case h2_stream_set:type(Stream) of
active ->
{FramesToSend, NewContext} =
h2_frame_headers:to_frames(h2_stream_set:stream_id(Stream),
Headers,
EncodeContext,
(Conn#connection.peer_settings)#settings.max_frame_size,
true
),
NewS = h2_stream_set:update_trailers(FramesToSend, Stream),
{NewSWS, NewStreams} =
h2_stream_set:send_what_we_can(
StreamId,
Conn#connection.send_window_size,
(Conn#connection.peer_settings)#settings.max_frame_size,
h2_stream_set:upsert(
h2_stream_set:update_data_queue(h2_stream_set:queued_data(Stream), false, NewS),
Conn#connection.streams)),

send_t(Stream, Headers),
{keep_state,
Conn#connection{
encode_context=NewContext,
send_window_size=NewSWS,
streams=NewStreams
}};
idle ->
%% In theory this is a client maybe activating a stream,
%% but in practice, we've already activated the stream in
%% new_stream/1
{keep_state, Conn};
closed ->
{keep_state, Conn}
end;
handle_event(_, {send_body, StreamId, Body, Opts},
#connection{}=Conn) ->
lager:debug("[~p] Send Body Stream ~p",
Expand Down Expand Up @@ -1616,6 +1673,21 @@ send_h(Stream, Headers) ->
h2_stream:send_event(Pid, {send_h, Headers})
end.

-spec send_t(
h2_stream_set:stream(),
hpack:headers()) ->
ok.
send_t(Stream, Trailers) ->
case h2_stream_set:pid(Stream) of
undefined ->
%% Should this be some kind of error?
lager:debug("tried sending trailers on a non running stream ~p",
[h2_stream_set:stream_id(Stream)]),
ok;
Pid ->
h2_stream:send_event(Pid, {send_t, Trailers})
end.

-spec recv_es(Stream :: h2_stream_set:stream(),
Conn :: connection()) ->
ok | {rst_stream, error_code()}.
Expand Down
73 changes: 73 additions & 0 deletions src/h2_stream.erl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
request_end_stream = false :: boolean(),
request_end_headers = false :: boolean(),
response_headers = [] :: hpack:headers(),
response_trailers = [] :: hpack:headers(),
response_body :: iodata() | undefined,
response_end_headers = false :: boolean(),
response_end_stream = false :: boolean(),
Expand Down Expand Up @@ -257,6 +258,14 @@ reserved_local(cast, {send_h, Headers},
Stream#stream_state{
response_headers=Headers
}};
reserved_local(cast, {send_t, Headers},
#stream_state{
}=Stream) ->
{next_state,
half_closed_remote,
Stream#stream_state{
response_trailers=Headers
}};
reserved_local(Type, Event, State) ->
handle_event(Type, Event, State).

Expand All @@ -268,6 +277,14 @@ reserved_remote(cast, {recv_h, Headers},
Stream#stream_state{
response_headers=Headers
}};
reserved_remote(cast, {recv_t, Headers},
#stream_state{
}=Stream) ->
{next_state,
half_closed_local,
Stream#stream_state{
response_headers=Headers
}};
reserved_remote(Type, Event, State) ->
handle_event(Type, Event, State).

Expand Down Expand Up @@ -360,6 +377,24 @@ open(cast, {recv_h, Trailers},
{error, Code} ->
rst_stream_(Code, Stream)
end;
open(cast, {send_data,
{#frame_header{
type=?HEADERS,
flags=Flags
}, _}=F},
#stream_state{
socket=Socket
}=Stream) ->
sock:send(Socket, h2_frame:to_binary(F)),

NextState =
case ?IS_FLAG(Flags, ?FLAG_END_STREAM) of
true ->
half_closed_local;
_ ->
open
end,
{next_state, NextState, Stream};
open(cast, {send_data,
{#frame_header{
type=?DATA,
Expand All @@ -386,6 +421,14 @@ open(cast,
Stream#stream_state{
response_headers=Headers
}};
open(cast,
{send_t, Headers},
#stream_state{}=Stream) ->
{next_state,
half_closed_local,
Stream#stream_state{
response_trailers=Headers
}};
open(Type, Event, State) ->
handle_event(Type, Event, State).

Expand All @@ -398,6 +441,14 @@ half_closed_remote(cast,
Stream#stream_state{
response_headers=Headers
}};
half_closed_remote(cast,
{send_t, Headers},
#stream_state{}=Stream) ->
{next_state,
half_closed_remote,
Stream#stream_state{
response_trailers=Headers
}};
half_closed_remote(cast,
{send_data,
{
Expand All @@ -420,6 +471,28 @@ half_closed_remote(cast,
{error,_} ->
{next_state, closed, Stream, 0}
end;
half_closed_remote(cast,
{send_data,
{
#frame_header{
flags=Flags,
type=?HEADERS
},_
}=F}=_Msg,
#stream_state{
socket=Socket
}=Stream) ->
case sock:send(Socket, h2_frame:to_binary(F)) of
ok ->
case ?IS_FLAG(Flags, ?FLAG_END_STREAM) of
true ->
{next_state, closed, Stream, 0};
_ ->
{next_state, half_closed_remote, Stream}
end;
{error,_} ->
{next_state, closed, Stream, 0}
end;


half_closed_remote(cast, _,
Expand Down
29 changes: 26 additions & 3 deletions src/h2_stream_set.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@
% hasn't allowed it to be sent yet
queued_data :: undefined | done | binary(),
% Has the body been completely recieved.
body_complete = false :: boolean()
body_complete = false :: boolean(),
trailers = undefined :: [h2_frame:frame()] | undefined
}).
-type active_stream() :: #active_stream{}.

Expand Down Expand Up @@ -132,6 +133,7 @@
-export(
[
queued_data/1,
update_trailers/2,
update_data_queue/3,
decrement_recv_window/2,
recv_window_size/1,
Expand Down Expand Up @@ -664,9 +666,16 @@ c_send_what_we_can(SWS, MFS, [S|Streams], Acc) ->
MFS :: non_neg_integer(),
Stream :: stream()) ->
{integer(), stream()}.
s_send_what_we_can(SWS, _, #active_stream{queued_data=Data}=S)
s_send_what_we_can(SWS, _, #active_stream{queued_data=Data,
trailers=undefined}=S)
when is_atom(Data) ->
{SWS, S};
s_send_what_we_can(SWS, _, #active_stream{queued_data=Data,
pid=Pid,
trailers=Trailers}=S)
when is_atom(Data) ->
[h2_stream:send_data(Pid, Frame) || Frame <- Trailers],
{SWS, S};
s_send_what_we_can(SWS, MFS, #active_stream{}=Stream) ->
%% We're coming in here with three numbers we need to look at:
%% * Connection send window size
Expand All @@ -692,7 +701,7 @@ s_send_what_we_can(SWS, MFS, #active_stream{}=Stream) ->
%% this recursion, but not the connection level

SSWS = Stream#active_stream.send_window_size,

Trailers = Stream#active_stream.trailers,
QueueSize = byte_size(Stream#active_stream.queued_data),

{MaxToSend, ExitStrategy} =
Expand Down Expand Up @@ -741,6 +750,17 @@ s_send_what_we_can(SWS, MFS, #active_stream{}=Stream) ->

_Sent = h2_stream:send_data(Stream#active_stream.pid, Frame),

case NewS of
#active_stream{trailers=undefined} ->
ok;
#active_stream{pid=Pid,
queued_data=done,
trailers=Trailers} ->
[h2_stream:send_data(Pid, Trailer) || Trailer <- Trailers];
_ ->
ok
end,

case ExitStrategy of
max_frame_size ->
s_send_what_we_can(SWS - SentBytes, MFS, NewS);
Expand Down Expand Up @@ -783,6 +803,9 @@ queued_data(#active_stream{queued_data=QD}) ->
queued_data(_) ->
undefined.

update_trailers(Trailers, Stream=#active_stream{}) ->
Stream#active_stream{trailers=Trailers}.

update_data_queue(
NewBody,
BodyComplete,
Expand Down

0 comments on commit b51756f

Please sign in to comment.