From b51756fa78711b858203667e4bfa94bc3716eedc Mon Sep 17 00:00:00 2001 From: Tristan Sloughter Date: Mon, 5 Feb 2018 13:15:23 -0800 Subject: [PATCH] add trailers to active_stream record to send when body is done (#123) --- src/h2_connection.erl | 72 ++++++++++++++++++++++++++++++++++++++++++ src/h2_stream.erl | 73 +++++++++++++++++++++++++++++++++++++++++++ src/h2_stream_set.erl | 29 +++++++++++++++-- 3 files changed, 171 insertions(+), 3 deletions(-) diff --git a/src/h2_connection.erl b/src/h2_connection.erl index 82211507..4bb40ba9 100644 --- a/src/h2_connection.erl +++ b/src/h2_connection.erl @@ -17,6 +17,8 @@ -export([ send_headers/3, send_headers/4, + send_trailers/3, + send_trailers/4, send_body/3, send_body/4, send_request/3, @@ -245,6 +247,16 @@ send_headers(Pid, StreamId, Headers, Opts) -> gen_statem:cast(Pid, {send_headers, StreamId, Headers, Opts}), ok. +-spec send_trailers(pid(), stream_id(), hpack:headers()) -> ok. +send_trailers(Pid, StreamId, Trailers) -> + gen_statem:cast(Pid, {send_trailers, StreamId, Trailers, []}), + ok. + +-spec send_trailers(pid(), stream_id(), hpack:headers(), send_opts()) -> ok. +send_trailers(Pid, StreamId, Trailers, Opts) -> + gen_statem:cast(Pid, {send_trailers, StreamId, Trailers, Opts}), + ok. + -spec send_body(pid(), stream_id(), binary()) -> ok. send_body(Pid, StreamId, Body) -> gen_statem:cast(Pid, {send_body, StreamId, Body, []}), @@ -1011,7 +1023,52 @@ handle_event(_, {send_headers, StreamId, Headers, Opts}, closed -> {keep_state, Conn} end; +handle_event(_, {send_trailers, StreamId, Headers, _Opts}, + #connection{ + encode_context=EncodeContext, + streams = Streams, + socket = _Socket + }=Conn + ) -> + lager:debug("[~p] {send trailers, ~p, ~p}", + [Conn#connection.type, StreamId, Headers]), + %% StreamComplete = proplists:get_value(send_end_stream, Opts, false), + + Stream = h2_stream_set:get(StreamId, Streams), + case h2_stream_set:type(Stream) of + active -> + {FramesToSend, NewContext} = + h2_frame_headers:to_frames(h2_stream_set:stream_id(Stream), + Headers, + EncodeContext, + (Conn#connection.peer_settings)#settings.max_frame_size, + true + ), + NewS = h2_stream_set:update_trailers(FramesToSend, Stream), + {NewSWS, NewStreams} = + h2_stream_set:send_what_we_can( + StreamId, + Conn#connection.send_window_size, + (Conn#connection.peer_settings)#settings.max_frame_size, + h2_stream_set:upsert( + h2_stream_set:update_data_queue(h2_stream_set:queued_data(Stream), false, NewS), + Conn#connection.streams)), + send_t(Stream, Headers), + {keep_state, + Conn#connection{ + encode_context=NewContext, + send_window_size=NewSWS, + streams=NewStreams + }}; + idle -> + %% In theory this is a client maybe activating a stream, + %% but in practice, we've already activated the stream in + %% new_stream/1 + {keep_state, Conn}; + closed -> + {keep_state, Conn} + end; handle_event(_, {send_body, StreamId, Body, Opts}, #connection{}=Conn) -> lager:debug("[~p] Send Body Stream ~p", @@ -1616,6 +1673,21 @@ send_h(Stream, Headers) -> h2_stream:send_event(Pid, {send_h, Headers}) end. +-spec send_t( + h2_stream_set:stream(), + hpack:headers()) -> + ok. +send_t(Stream, Trailers) -> + case h2_stream_set:pid(Stream) of + undefined -> + %% Should this be some kind of error? + lager:debug("tried sending trailers on a non running stream ~p", + [h2_stream_set:stream_id(Stream)]), + ok; + Pid -> + h2_stream:send_event(Pid, {send_t, Trailers}) + end. + -spec recv_es(Stream :: h2_stream_set:stream(), Conn :: connection()) -> ok | {rst_stream, error_code()}. diff --git a/src/h2_stream.erl b/src/h2_stream.erl index 878b2365..6af7ec0b 100644 --- a/src/h2_stream.erl +++ b/src/h2_stream.erl @@ -55,6 +55,7 @@ request_end_stream = false :: boolean(), request_end_headers = false :: boolean(), response_headers = [] :: hpack:headers(), + response_trailers = [] :: hpack:headers(), response_body :: iodata() | undefined, response_end_headers = false :: boolean(), response_end_stream = false :: boolean(), @@ -257,6 +258,14 @@ reserved_local(cast, {send_h, Headers}, Stream#stream_state{ response_headers=Headers }}; +reserved_local(cast, {send_t, Headers}, + #stream_state{ + }=Stream) -> + {next_state, + half_closed_remote, + Stream#stream_state{ + response_trailers=Headers + }}; reserved_local(Type, Event, State) -> handle_event(Type, Event, State). @@ -268,6 +277,14 @@ reserved_remote(cast, {recv_h, Headers}, Stream#stream_state{ response_headers=Headers }}; +reserved_remote(cast, {recv_t, Headers}, + #stream_state{ + }=Stream) -> + {next_state, + half_closed_local, + Stream#stream_state{ + response_headers=Headers + }}; reserved_remote(Type, Event, State) -> handle_event(Type, Event, State). @@ -360,6 +377,24 @@ open(cast, {recv_h, Trailers}, {error, Code} -> rst_stream_(Code, Stream) end; +open(cast, {send_data, + {#frame_header{ + type=?HEADERS, + flags=Flags + }, _}=F}, + #stream_state{ + socket=Socket + }=Stream) -> + sock:send(Socket, h2_frame:to_binary(F)), + + NextState = + case ?IS_FLAG(Flags, ?FLAG_END_STREAM) of + true -> + half_closed_local; + _ -> + open + end, + {next_state, NextState, Stream}; open(cast, {send_data, {#frame_header{ type=?DATA, @@ -386,6 +421,14 @@ open(cast, Stream#stream_state{ response_headers=Headers }}; +open(cast, + {send_t, Headers}, + #stream_state{}=Stream) -> + {next_state, + half_closed_local, + Stream#stream_state{ + response_trailers=Headers + }}; open(Type, Event, State) -> handle_event(Type, Event, State). @@ -398,6 +441,14 @@ half_closed_remote(cast, Stream#stream_state{ response_headers=Headers }}; +half_closed_remote(cast, + {send_t, Headers}, + #stream_state{}=Stream) -> + {next_state, + half_closed_remote, + Stream#stream_state{ + response_trailers=Headers + }}; half_closed_remote(cast, {send_data, { @@ -420,6 +471,28 @@ half_closed_remote(cast, {error,_} -> {next_state, closed, Stream, 0} end; +half_closed_remote(cast, + {send_data, + { + #frame_header{ + flags=Flags, + type=?HEADERS + },_ + }=F}=_Msg, + #stream_state{ + socket=Socket + }=Stream) -> + case sock:send(Socket, h2_frame:to_binary(F)) of + ok -> + case ?IS_FLAG(Flags, ?FLAG_END_STREAM) of + true -> + {next_state, closed, Stream, 0}; + _ -> + {next_state, half_closed_remote, Stream} + end; + {error,_} -> + {next_state, closed, Stream, 0} + end; half_closed_remote(cast, _, diff --git a/src/h2_stream_set.erl b/src/h2_stream_set.erl index 73bb73ae..3e0219f1 100644 --- a/src/h2_stream_set.erl +++ b/src/h2_stream_set.erl @@ -79,7 +79,8 @@ % hasn't allowed it to be sent yet queued_data :: undefined | done | binary(), % Has the body been completely recieved. - body_complete = false :: boolean() + body_complete = false :: boolean(), + trailers = undefined :: [h2_frame:frame()] | undefined }). -type active_stream() :: #active_stream{}. @@ -132,6 +133,7 @@ -export( [ queued_data/1, + update_trailers/2, update_data_queue/3, decrement_recv_window/2, recv_window_size/1, @@ -664,9 +666,16 @@ c_send_what_we_can(SWS, MFS, [S|Streams], Acc) -> MFS :: non_neg_integer(), Stream :: stream()) -> {integer(), stream()}. -s_send_what_we_can(SWS, _, #active_stream{queued_data=Data}=S) +s_send_what_we_can(SWS, _, #active_stream{queued_data=Data, + trailers=undefined}=S) when is_atom(Data) -> {SWS, S}; +s_send_what_we_can(SWS, _, #active_stream{queued_data=Data, + pid=Pid, + trailers=Trailers}=S) + when is_atom(Data) -> + [h2_stream:send_data(Pid, Frame) || Frame <- Trailers], + {SWS, S}; s_send_what_we_can(SWS, MFS, #active_stream{}=Stream) -> %% We're coming in here with three numbers we need to look at: %% * Connection send window size @@ -692,7 +701,7 @@ s_send_what_we_can(SWS, MFS, #active_stream{}=Stream) -> %% this recursion, but not the connection level SSWS = Stream#active_stream.send_window_size, - + Trailers = Stream#active_stream.trailers, QueueSize = byte_size(Stream#active_stream.queued_data), {MaxToSend, ExitStrategy} = @@ -741,6 +750,17 @@ s_send_what_we_can(SWS, MFS, #active_stream{}=Stream) -> _Sent = h2_stream:send_data(Stream#active_stream.pid, Frame), + case NewS of + #active_stream{trailers=undefined} -> + ok; + #active_stream{pid=Pid, + queued_data=done, + trailers=Trailers} -> + [h2_stream:send_data(Pid, Trailer) || Trailer <- Trailers]; + _ -> + ok + end, + case ExitStrategy of max_frame_size -> s_send_what_we_can(SWS - SentBytes, MFS, NewS); @@ -783,6 +803,9 @@ queued_data(#active_stream{queued_data=QD}) -> queued_data(_) -> undefined. +update_trailers(Trailers, Stream=#active_stream{}) -> + Stream#active_stream{trailers=Trailers}. + update_data_queue( NewBody, BodyComplete,