Skip to content

Commit

Permalink
ssh: auto adjust window in client
Browse files Browse the repository at this point in the history
  • Loading branch information
u3s committed Apr 24, 2024
1 parent 928d03e commit 8c9065e
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 40 deletions.
1 change: 1 addition & 0 deletions lib/ssh/src/ssh_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1523,6 +1523,7 @@ channel_data_reply_msg(ChannelId, Connection, DataType, Data) ->
WantedSize = Size - byte_size(Data),
ssh_client_channel:cache_update(Connection#connection.channel_cache,
Channel#channel{recv_window_size = WantedSize}),
adjust_window(self(), ChannelId, byte_size(Data)),
reply_msg(Channel, Connection, {data, ChannelId, DataType, Data});
undefined ->
{[], Connection}
Expand Down
139 changes: 99 additions & 40 deletions lib/ssh/test/ssh_connection_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
%%
-module(ssh_connection_SUITE).

-include_lib("common_test/include/ct.hrl").
-include("ssh_connect.hrl").
-include("ssh_test_lib.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("stdlib/include/assert.hrl").

-export([
suite/0,
Expand Down Expand Up @@ -58,7 +59,7 @@
connect_timeout/1,
daemon_sock_not_passive/1,
daemon_sock_not_tcp/1,
do_interrupted_send/3,
do_interrupted_send/4,
do_simple_exec/1,
encode_decode_pty_opts/1,
exec_disabled/1,
Expand All @@ -81,6 +82,7 @@
send_after_exit/1,
simple_eval/1,
simple_exec/1,
simple_exec_more_data/1,
simple_exec_sock/1,
simple_exec_two_socks/1,
small_cat/1,
Expand All @@ -97,6 +99,7 @@
start_shell_exec_direct_fun1_error_type/1,
start_shell_exec_direct_fun2/1,
start_shell_exec_direct_fun3/1,
start_shell_exec_direct_fun_more_data/1,
start_shell_exec_fun/1,
start_shell_exec_fun2/1,
start_shell_exec_fun3/1,
Expand Down Expand Up @@ -140,6 +143,7 @@ all() ->
start_shell_exec_direct_fun,
start_shell_exec_direct_fun2,
start_shell_exec_direct_fun3,
start_shell_exec_direct_fun_more_data,
start_shell_exec_direct_fun1_error,
start_shell_exec_direct_fun1_error_type,
start_exec_direct_fun1_read_write,
Expand Down Expand Up @@ -180,6 +184,7 @@ groups() ->

payload() ->
[simple_exec,
simple_exec_more_data,
simple_exec_sock,
simple_exec_two_socks,
small_cat,
Expand Down Expand Up @@ -240,6 +245,10 @@ simple_exec(Config) when is_list(Config) ->
ConnectionRef = ssh_test_lib:connect(?SSH_DEFAULT_PORT, []),
do_simple_exec(ConnectionRef).

simple_exec_more_data(Config) when is_list(Config) ->
ConnectionRef = ssh_test_lib:connect(?SSH_DEFAULT_PORT, []),
%% more data received, SSH window adjust needs to be sent by client
do_simple_exec(ConnectionRef, 60000).
%%--------------------------------------------------------------------
simple_exec_sock(_Config) ->
{ok, Sock} = ssh_test_lib:gen_tcp_connect(?SSH_DEFAULT_PORT, [{active,false}]),
Expand Down Expand Up @@ -490,9 +499,6 @@ big_cat(Config) when is_list(Config) ->
%% build 10MB binary
Data = << <<X:32>> || X <- lists:seq(1,2500000)>>,

%% pre-adjust receive window so the other end doesn't block
ssh_connection:adjust_window(ConnectionRef, ChannelId0, size(Data)),

ct:log("sending ~p byte binary~n",[size(Data)]),
ok = ssh_connection:send(ConnectionRef, ChannelId0, Data, 10000),
ok = ssh_connection:send_eof(ConnectionRef, ChannelId0),
Expand Down Expand Up @@ -633,12 +639,12 @@ ptty_alloc_pixel(Config) when is_list(Config) ->
small_interrupted_send(Config) ->
K = 1024,
M = K*K,
do_interrupted_send(Config, 10*M, 4*K).
do_interrupted_send(Config, 10*M, 4*K, {error, closed}).
interrupted_send(Config) ->
M = 1024*1024,
do_interrupted_send(Config, 10*M, 4*M).
do_interrupted_send(Config, 10*M, 4*M, ok).

do_interrupted_send(Config, SendSize, EchoSize) ->
do_interrupted_send(Config, SendSize, EchoSize, SenderResult) ->
PrivDir = proplists:get_value(priv_dir, Config),
UserDir = filename:join(PrivDir, nopubkey), % to make sure we don't use public-key-auth
file:make_dir(UserDir),
Expand Down Expand Up @@ -716,15 +722,15 @@ do_interrupted_send(Config, SendSize, EchoSize) ->
ssh:stop_daemon(Pid),
ct:log("~p:~p Check sender", [?MODULE,?LINE]),
receive
{SenderPid, {error, closed}} ->
{SenderPid, SenderResult} ->
ct:log("~p:~p {error,closed} - That's what we expect :)",[?MODULE,?LINE]),
ok;
Msg ->
ct:log("~p:~p Not expected send result: ~p",[?MODULE,?LINE,Msg]),
{fail, "Not expected msg"}
end;

{SenderPid, {error, closed}} ->
{SenderPid, SenderResult} ->
ct:log("~p:~p {error,closed} - That's what we expect, but client channel handler has not reported yet",[?MODULE,?LINE]),
receive
{ResultPid, result, Result} ->
Expand Down Expand Up @@ -1005,6 +1011,24 @@ start_shell_exec_direct_fun3(Config) ->
"testing", <<"echo foo testing">>, 0,
Config).

start_shell_exec_direct_fun_more_data(Config) ->
N = 60000,
ExpectedBin = <<"testing\n">>,
ReceiveFun =
fun(ConnectionRef, ChannelId, _Expect, _ExpectType) ->
receive_bytes(ConnectionRef, ChannelId,
N * byte_size(ExpectedBin), 0)
end,
do_start_shell_exec_fun({direct,
fun(_Cmd) ->
{ok,
[io_lib:format("testing~n",[]) ||
_ <- lists:seq(1, N)]}
end},
"not_relevant", <<"not_used\n">>, 0,
ReceiveFun,
Config).

start_shell_exec_direct_fun1_error(Config) ->
do_start_shell_exec_fun({direct, fun(_Cmd) -> {error, {bad}} end},
"testing", <<"**Error** {bad}">>, 1,
Expand Down Expand Up @@ -1139,6 +1163,28 @@ simple_eval(Inp) -> {simple_eval,Inp}.


do_start_shell_exec_fun(Fun, Command, Expect, ExpectType, Config) ->
DefaultReceiveFun =
fun(ConnectionRef, ChannelId, Expect, ExpectType) ->
receive
{ssh_cm, ConnectionRef, {data, ChannelId, ExpectType, Expect}} ->
ok
after 5000 ->
receive
Other ->
ct:log("Received other:~n~p~nExpected: ~p~n",
[Other,
{ssh_cm, ConnectionRef,
{data, ChannelId, ExpectType, Expect}}]),
%% {data, '_ChannelId', ExpectType, Expect}}]),
ct:fail("Unexpected response")
after 0 ->
ct:fail("Exec Timeout")
end
end
end,
do_start_shell_exec_fun(Fun, Command, Expect, ExpectType, DefaultReceiveFun, Config).

do_start_shell_exec_fun(Fun, Command, Expect, ExpectType, ReceiveFun, Config) ->
PrivDir = proplists:get_value(priv_dir, Config),
UserDir = filename:join(PrivDir, nopubkey), % to make sure we don't use public-key-auth
file:make_dir(UserDir),
Expand All @@ -1154,24 +1200,9 @@ do_start_shell_exec_fun(Fun, Command, Expect, ExpectType, Config) ->
{user_interaction, true},
{user_dir, UserDir}]),

{ok, ChannelId0} = ssh_connection:session_channel(ConnectionRef, infinity),

success = ssh_connection:exec(ConnectionRef, ChannelId0, Command, infinity),

receive
{ssh_cm, ConnectionRef, {data, _ChannelId, ExpectType, Expect}} ->
ok
after 5000 ->
receive
Other ->
ct:log("Received other:~n~p~nExpected: ~p~n",
[Other, {ssh_cm, ConnectionRef, {data, '_ChannelId', ExpectType, Expect}} ]),
ct:fail("Unexpected response")
after 0 ->
ct:fail("Exec Timeout")
end
end,

{ok, ChannelId} = ssh_connection:session_channel(ConnectionRef, infinity),
success = ssh_connection:exec(ConnectionRef, ChannelId, Command, infinity),
ReceiveFun(ConnectionRef, ChannelId, Expect, ExpectType),
ssh:close(ConnectionRef),
ssh:stop_daemon(Pid).

Expand Down Expand Up @@ -1732,17 +1763,26 @@ max_channels_option(Config) when is_list(Config) ->
%%--------------------------------------------------------------------
%% Internal functions ------------------------------------------------
%%--------------------------------------------------------------------

do_simple_exec(ConnectionRef) ->
do_simple_exec(ConnectionRef, 1).

do_simple_exec(ConnectionRef, N) ->
{ok, ChannelId0} = ssh_connection:session_channel(ConnectionRef, infinity),
success = ssh_connection:exec(ConnectionRef, ChannelId0,
"echo testing", infinity),
%% receive response to input
receive
{ssh_cm, ConnectionRef, {data, ChannelId0, 0, <<"testing\n">>}} ->
ok
after
10000 -> ct:fail("timeout ~p:~p",[?MODULE,?LINE])
Cmd = "yes testing | head -n " ++ integer_to_list(N),
ct:log("Cmd to be invoked over SSH shell: ~p", [Cmd]),
success = ssh_connection:exec(ConnectionRef, ChannelId0, Cmd, infinity),
ExpectedBin = <<"testing\n">>,
case N of
1 ->
%% receive response to input
receive
{ssh_cm, ConnectionRef, {data, ChannelId0, 0, ExpectedBin}} ->
ok
after
10000 -> ct:fail("timeout ~p:~p",[?MODULE,?LINE])
end;
_ ->
receive_bytes(ConnectionRef, ChannelId0, N * byte_size(ExpectedBin), 0)
end,

%% receive close messages
Expand Down Expand Up @@ -1879,8 +1919,6 @@ big_cat_rx(ConnectionRef, ChannelId) ->
big_cat_rx(ConnectionRef, ChannelId, Acc) ->
receive
{ssh_cm, ConnectionRef, {data, ChannelId, 0, Data}} ->
%% ssh_connection:adjust_window(ConnectionRef, ChannelId, size(Data)),
%% window was pre-adjusted, don't adjust again here
big_cat_rx(ConnectionRef, ChannelId, [Data | Acc]);
{ssh_cm, ConnectionRef, {eof, ChannelId}} ->
{ok, iolist_to_binary(lists:reverse(Acc))}
Expand All @@ -1898,7 +1936,6 @@ collect_data(ConnectionRef, ChannelId, EchoSize, Acc, Sum) ->
{ssh_cm, ConnectionRef, {data, ChannelId, 0, Data}} when is_binary(Data) ->
ct:log("~p:~p collect_data: received ~p bytes. total ~p bytes, want ~p more",
[?MODULE,?LINE,size(Data),Sum+size(Data),EchoSize-Sum]),
ssh_connection:adjust_window(ConnectionRef, ChannelId, size(Data)),
collect_data(ConnectionRef, ChannelId, EchoSize, [Data | Acc], Sum+size(Data));
{ssh_cm, ConnectionRef, Msg={eof, ChannelId}} ->
collect_data_report_end(Acc, Msg, EchoSize);
Expand Down Expand Up @@ -1953,3 +1990,25 @@ ssh_exec_echo(Cmd, User) ->
spawn(fun() ->
io:format("echo ~s ~s\n",[User,Cmd])
end).

receive_bytes(_, _, 0, _) ->
ct:log("ALL DATA RECEIVED Budget = 0"),
ct:log("================================ ExpectBudget = 0 (reception completed)"),
ok;
receive_bytes(ConnectionRef, ChannelId0, Budget, AccSize) when Budget > 0 ->
receive
{ssh_cm, ConnectionRef, {data, ChannelId0, 0, D}} ->
Fmt = "================================ ExpectBudget = "
"~p bytes Received/Total = ~p/~p bytes",
Args = [Budget, byte_size(D), AccSize + byte_size(D)],
ct:log(Fmt, Args),
%% ct:log(Fmt, Args),
%% ?DBG_TERM(D),
receive_bytes(ConnectionRef, ChannelId0,
Budget - byte_size(D), AccSize + byte_size(D))
after
10000 ->
ct:log("process_info(self(), messages) = ~p",
[process_info(self(), messages)]),
ct:fail("timeout ~p:~p",[?MODULE,?LINE])
end.

0 comments on commit 8c9065e

Please sign in to comment.