Skip to content

Commit 14a6ac1

Browse files
committed
ssh: auto adjust window in client
1 parent 0863bd3 commit 14a6ac1

File tree

2 files changed

+114
-61
lines changed

2 files changed

+114
-61
lines changed

lib/ssh/src/ssh_connection.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1523,6 +1523,7 @@ channel_data_reply_msg(ChannelId, Connection, DataType, Data) ->
15231523
WantedSize = Size - size(Data),
15241524
ssh_client_channel:cache_update(Connection#connection.channel_cache,
15251525
Channel#channel{recv_window_size = WantedSize}),
1526+
adjust_window(self(), ChannelId, byte_size(Data)),
15261527
reply_msg(Channel, Connection, {data, ChannelId, DataType, Data});
15271528
undefined ->
15281529
{[], Connection}

lib/ssh/test/ssh_connection_SUITE.erl

Lines changed: 113 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@
2121
%%
2222
-module(ssh_connection_SUITE).
2323

24-
-include_lib("common_test/include/ct.hrl").
2524
-include("ssh_connect.hrl").
2625
-include("ssh_test_lib.hrl").
26+
-include_lib("common_test/include/ct.hrl").
27+
-include_lib("stdlib/include/assert.hrl").
2728

2829

2930

@@ -60,7 +61,7 @@
6061
connect_timeout/1,
6162
daemon_sock_not_passive/1,
6263
daemon_sock_not_tcp/1,
63-
do_interrupted_send/3,
64+
do_interrupted_send/4,
6465
do_simple_exec/1,
6566
encode_decode_pty_opts/1,
6667
exec_disabled/1,
@@ -83,6 +84,7 @@
8384
send_after_exit/1,
8485
simple_eval/1,
8586
simple_exec/1,
87+
simple_exec_more_data/1,
8688
simple_exec_sock/1,
8789
simple_exec_two_socks/1,
8890
small_cat/1,
@@ -97,6 +99,7 @@
9799
start_shell_exec_direct_fun1_error_type/1,
98100
start_shell_exec_direct_fun2/1,
99101
start_shell_exec_direct_fun3/1,
102+
start_shell_exec_direct_fun_more_data/1,
100103
start_shell_exec_fun/1,
101104
start_shell_exec_fun2/1,
102105
start_shell_exec_fun3/1,
@@ -138,6 +141,7 @@ all() ->
138141
start_shell_exec_direct_fun,
139142
start_shell_exec_direct_fun2,
140143
start_shell_exec_direct_fun3,
144+
start_shell_exec_direct_fun_more_data,
141145
start_shell_exec_direct_fun1_error,
142146
start_shell_exec_direct_fun1_error_type,
143147
start_exec_direct_fun1_read_write,
@@ -178,6 +182,7 @@ groups() ->
178182

179183
payload() ->
180184
[simple_exec,
185+
simple_exec_more_data,
181186
simple_exec_sock,
182187
simple_exec_two_socks,
183188
small_cat,
@@ -238,6 +243,10 @@ simple_exec(Config) when is_list(Config) ->
238243
ConnectionRef = ssh_test_lib:connect(?SSH_DEFAULT_PORT, []),
239244
do_simple_exec(ConnectionRef).
240245

246+
simple_exec_more_data(Config) when is_list(Config) ->
247+
ConnectionRef = ssh_test_lib:connect(?SSH_DEFAULT_PORT, []),
248+
%% more data received, SSH window adjust needs to be sent by client
249+
do_simple_exec(ConnectionRef, 60000).
241250
%%--------------------------------------------------------------------
242251
simple_exec_sock(_Config) ->
243252
{ok, Sock} = ssh_test_lib:gen_tcp_connect(?SSH_DEFAULT_PORT, [{active,false}]),
@@ -488,9 +497,6 @@ big_cat(Config) when is_list(Config) ->
488497
%% build 10MB binary
489498
Data = << <<X:32>> || X <- lists:seq(1,2500000)>>,
490499

491-
%% pre-adjust receive window so the other end doesn't block
492-
ssh_connection:adjust_window(ConnectionRef, ChannelId0, size(Data)),
493-
494500
ct:log("sending ~p byte binary~n",[size(Data)]),
495501
ok = ssh_connection:send(ConnectionRef, ChannelId0, Data, 10000),
496502
ok = ssh_connection:send_eof(ConnectionRef, ChannelId0),
@@ -628,15 +634,18 @@ ptty_alloc_pixel(Config) when is_list(Config) ->
628634
ssh:close(ConnectionRef).
629635

630636
%%--------------------------------------------------------------------
631-
small_interrupted_send(Config) ->
637+
small_interrupted_send(Config) ->
632638
K = 1024,
633-
M = K*K,
634-
do_interrupted_send(Config, 10*M, 4*K).
639+
SendSize = 10 * K * K,
640+
EchoSize = 4 * K,
641+
do_interrupted_send(Config, SendSize, EchoSize, {error, closed}).
635642
interrupted_send(Config) ->
636-
M = 1024*1024,
637-
do_interrupted_send(Config, 10*M, 4*M).
643+
K = 1024,
644+
SendSize = 10 * K * K,
645+
EchoSize = 4 * K * K,
646+
do_interrupted_send(Config, SendSize, EchoSize, ok).
638647

639-
do_interrupted_send(Config, SendSize, EchoSize) ->
648+
do_interrupted_send(Config, SendSize, EchoSize, SenderResult) ->
640649
PrivDir = proplists:get_value(priv_dir, Config),
641650
UserDir = filename:join(PrivDir, nopubkey), % to make sure we don't use public-key-auth
642651
file:make_dir(UserDir),
@@ -646,21 +655,17 @@ do_interrupted_send(Config, SendSize, EchoSize) ->
646655
{user_dir, UserDir},
647656
{password, "morot"},
648657
{subsystems, [{"echo_n",EchoSS_spec}]}]),
649-
650658
ct:log("~p:~p connect", [?MODULE,?LINE]),
651659
ConnectionRef = ssh_test_lib:connect(Host, Port, [{silently_accept_hosts, true},
652660
{user, "foo"},
653661
{password, "morot"},
654662
{user_interaction, false},
655663
{user_dir, UserDir}]),
656664
ct:log("~p:~p connected", [?MODULE,?LINE]),
657-
658665
%% build big binary
659666
Data = << <<X:32>> || X <- lists:seq(1,SendSize div 4)>>,
660-
661667
%% expect remote end to send us EchoSize back
662668
<<ExpectedData:EchoSize/binary, _/binary>> = Data,
663-
664669
%% Spawn listener. Otherwise we could get a deadlock due to filled buffers
665670
Parent = self(),
666671
ResultPid = spawn(
@@ -671,11 +676,10 @@ do_interrupted_send(Config, SendSize, EchoSize) ->
671676
case ssh_connection:subsystem(ConnectionRef, ChannelId, "echo_n", infinity) of
672677
success ->
673678
Parent ! {self(), channelId, ChannelId},
674-
675-
Result =
679+
Result =
676680
try collect_data(ConnectionRef, ChannelId, EchoSize)
677681
of
678-
ExpectedData ->
682+
ExpectedData ->
679683
ct:log("~p:~p got expected data",[?MODULE,?LINE]),
680684
ok;
681685
Other ->
@@ -690,14 +694,12 @@ do_interrupted_send(Config, SendSize, EchoSize) ->
690694
Parent ! {self(), channelId, error, Other}
691695
end
692696
end),
693-
694697
receive
695698
{ResultPid, channelId, error, Other} ->
696699
ct:log("~p:~p channelId error ~p", [?MODULE,?LINE,Other]),
697700
ssh:close(ConnectionRef),
698701
ssh:stop_daemon(Pid),
699702
{fail, "ssh_connection:subsystem"};
700-
701703
{ResultPid, channelId, ChannelId} ->
702704
ct:log("~p:~p ~p going to send ~p bytes", [?MODULE,?LINE,self(),size(Data)]),
703705
SenderPid = spawn(fun() ->
@@ -707,23 +709,24 @@ do_interrupted_send(Config, SendSize, EchoSize) ->
707709
{ResultPid, result, {fail, Fail}} ->
708710
ct:log("~p:~p Listener failed: ~p", [?MODULE,?LINE,Fail]),
709711
{fail, Fail};
710-
711712
{ResultPid, result, Result} ->
712713
ct:log("~p:~p Got result: ~p", [?MODULE,?LINE,Result]),
713714
ssh:close(ConnectionRef),
714715
ssh:stop_daemon(Pid),
715716
ct:log("~p:~p Check sender", [?MODULE,?LINE]),
716717
receive
717-
{SenderPid, {error, closed}} ->
718-
ct:log("~p:~p {error,closed} - That's what we expect :)",[?MODULE,?LINE]),
718+
{SenderPid, SenderResult} ->
719+
ct:log("~p:~p ~p - That's what we expect :)",
720+
[?MODULE,?LINE, SenderResult]),
719721
ok;
720722
Msg ->
721723
ct:log("~p:~p Not expected send result: ~p",[?MODULE,?LINE,Msg]),
722724
{fail, "Not expected msg"}
723725
end;
724-
725-
{SenderPid, {error, closed}} ->
726-
ct:log("~p:~p {error,closed} - That's what we expect, but client channel handler has not reported yet",[?MODULE,?LINE]),
726+
{SenderPid, SenderResult} ->
727+
ct:log("~p:~p ~p - That's what we expect, "
728+
"but client channel handler has not reported yet",
729+
[?MODULE,?LINE, SenderResult]),
727730
receive
728731
{ResultPid, result, Result} ->
729732
ct:log("~p:~p Now got the result: ~p", [?MODULE,?LINE,Result]),
@@ -734,7 +737,6 @@ do_interrupted_send(Config, SendSize, EchoSize) ->
734737
ct:log("~p:~p Got an unexpected msg ~p",[?MODULE,?LINE,Msg]),
735738
{fail, "Un-expected msg"}
736739
end;
737-
738740
Msg ->
739741
ct:log("~p:~p Got unexpected ~p",[?MODULE,?LINE,Msg]),
740742
{fail, "Unexpected msg"}
@@ -933,6 +935,24 @@ start_shell_exec_direct_fun3(Config) ->
933935
"testing", <<"echo foo testing">>, 0,
934936
Config).
935937

938+
start_shell_exec_direct_fun_more_data(Config) ->
939+
N = 60000,
940+
ExpectedBin = <<"testing\n">>,
941+
ReceiveFun =
942+
fun(ConnectionRef, ChannelId, _Expect, _ExpectType) ->
943+
receive_bytes(ConnectionRef, ChannelId,
944+
N * byte_size(ExpectedBin), 0)
945+
end,
946+
do_start_shell_exec_fun({direct,
947+
fun(_Cmd) ->
948+
{ok,
949+
[io_lib:format("testing~n",[]) ||
950+
_ <- lists:seq(1, N)]}
951+
end},
952+
"not_relevant", <<"not_used\n">>, 0,
953+
ReceiveFun,
954+
Config).
955+
936956
start_shell_exec_direct_fun1_error(Config) ->
937957
do_start_shell_exec_fun({direct, fun(_Cmd) -> {error, {bad}} end},
938958
"testing", <<"**Error** {bad}">>, 1,
@@ -1067,6 +1087,28 @@ simple_eval(Inp) -> {simple_eval,Inp}.
10671087

10681088

10691089
do_start_shell_exec_fun(Fun, Command, Expect, ExpectType, Config) ->
1090+
DefaultReceiveFun =
1091+
fun(ConnectionRef, ChannelId, Expect, ExpectType) ->
1092+
receive
1093+
{ssh_cm, ConnectionRef, {data, ChannelId, ExpectType, Expect}} ->
1094+
ok
1095+
after 5000 ->
1096+
receive
1097+
Other ->
1098+
ct:log("Received other:~n~p~nExpected: ~p~n",
1099+
[Other,
1100+
{ssh_cm, ConnectionRef,
1101+
{data, ChannelId, ExpectType, Expect}}]),
1102+
%% {data, '_ChannelId', ExpectType, Expect}}]),
1103+
ct:fail("Unexpected response")
1104+
after 0 ->
1105+
ct:fail("Exec Timeout")
1106+
end
1107+
end
1108+
end,
1109+
do_start_shell_exec_fun(Fun, Command, Expect, ExpectType, DefaultReceiveFun, Config).
1110+
1111+
do_start_shell_exec_fun(Fun, Command, Expect, ExpectType, ReceiveFun, Config) ->
10701112
PrivDir = proplists:get_value(priv_dir, Config),
10711113
UserDir = filename:join(PrivDir, nopubkey), % to make sure we don't use public-key-auth
10721114
file:make_dir(UserDir),
@@ -1082,24 +1124,9 @@ do_start_shell_exec_fun(Fun, Command, Expect, ExpectType, Config) ->
10821124
{user_interaction, true},
10831125
{user_dir, UserDir}]),
10841126

1085-
{ok, ChannelId0} = ssh_connection:session_channel(ConnectionRef, infinity),
1086-
1087-
success = ssh_connection:exec(ConnectionRef, ChannelId0, Command, infinity),
1088-
1089-
receive
1090-
{ssh_cm, ConnectionRef, {data, _ChannelId, ExpectType, Expect}} ->
1091-
ok
1092-
after 5000 ->
1093-
receive
1094-
Other ->
1095-
ct:log("Received other:~n~p~nExpected: ~p~n",
1096-
[Other, {ssh_cm, ConnectionRef, {data, '_ChannelId', ExpectType, Expect}} ]),
1097-
ct:fail("Unexpected response")
1098-
after 0 ->
1099-
ct:fail("Exec Timeout")
1100-
end
1101-
end,
1102-
1127+
{ok, ChannelId} = ssh_connection:session_channel(ConnectionRef, infinity),
1128+
success = ssh_connection:exec(ConnectionRef, ChannelId, Command, infinity),
1129+
ReceiveFun(ConnectionRef, ChannelId, Expect, ExpectType),
11031130
ssh:close(ConnectionRef),
11041131
ssh:stop_daemon(Pid).
11051132

@@ -1660,17 +1687,26 @@ max_channels_option(Config) when is_list(Config) ->
16601687
%%--------------------------------------------------------------------
16611688
%% Internal functions ------------------------------------------------
16621689
%%--------------------------------------------------------------------
1663-
16641690
do_simple_exec(ConnectionRef) ->
1691+
do_simple_exec(ConnectionRef, 1).
1692+
1693+
do_simple_exec(ConnectionRef, N) ->
16651694
{ok, ChannelId0} = ssh_connection:session_channel(ConnectionRef, infinity),
1666-
success = ssh_connection:exec(ConnectionRef, ChannelId0,
1667-
"echo testing", infinity),
1668-
%% receive response to input
1669-
receive
1670-
{ssh_cm, ConnectionRef, {data, ChannelId0, 0, <<"testing\n">>}} ->
1671-
ok
1672-
after
1673-
10000 -> ct:fail("timeout ~p:~p",[?MODULE,?LINE])
1695+
Cmd = "yes testing | head -n " ++ integer_to_list(N),
1696+
ct:log("Cmd to be invoked over SSH shell: ~p", [Cmd]),
1697+
success = ssh_connection:exec(ConnectionRef, ChannelId0, Cmd, infinity),
1698+
ExpectedBin = <<"testing\n">>,
1699+
case N of
1700+
1 ->
1701+
%% receive response to input
1702+
receive
1703+
{ssh_cm, ConnectionRef, {data, ChannelId0, 0, ExpectedBin}} ->
1704+
ok
1705+
after
1706+
10000 -> ct:fail("timeout ~p:~p",[?MODULE,?LINE])
1707+
end;
1708+
_ ->
1709+
receive_bytes(ConnectionRef, ChannelId0, N * byte_size(ExpectedBin), 0)
16741710
end,
16751711

16761712
%% receive close messages
@@ -1785,6 +1821,7 @@ test_exec_is_enabled(ConnectionRef, Exec, Expect) ->
17851821
{ssh_cm, ConnectionRef, {data, ChannelId, 0, <<Expect:ExpSz/binary, _/binary>>}} = R ->
17861822
ct:log("~p:~p Got expected ~p",[?MODULE,?LINE,R]);
17871823
Other ->
1824+
%% FIXME - should this testcase fail when unexpected data is received?
17881825
ct:log("~p:~p Got unexpected ~p~nExpect: ~p~n",
17891826
[?MODULE,?LINE, Other, {ssh_cm, ConnectionRef, {data, ChannelId, 0, Expect}} ])
17901827
after 5000 ->
@@ -1798,8 +1835,6 @@ big_cat_rx(ConnectionRef, ChannelId) ->
17981835
big_cat_rx(ConnectionRef, ChannelId, Acc) ->
17991836
receive
18001837
{ssh_cm, ConnectionRef, {data, ChannelId, 0, Data}} ->
1801-
%% ssh_connection:adjust_window(ConnectionRef, ChannelId, size(Data)),
1802-
%% window was pre-adjusted, don't adjust again here
18031838
big_cat_rx(ConnectionRef, ChannelId, [Data | Acc]);
18041839
{ssh_cm, ConnectionRef, {eof, ChannelId}} ->
18051840
{ok, iolist_to_binary(lists:reverse(Acc))}
@@ -1808,7 +1843,8 @@ big_cat_rx(ConnectionRef, ChannelId, Acc) ->
18081843
end.
18091844

18101845
collect_data(ConnectionRef, ChannelId, EchoSize) ->
1811-
ct:log("~p:~p Listener ~p running! ConnectionRef=~p, ChannelId=~p",[?MODULE,?LINE,self(),ConnectionRef,ChannelId]),
1846+
ct:log("~p:~p Listener ~p running! ConnectionRef=~p, ChannelId=~p",
1847+
[?MODULE,?LINE,self(),ConnectionRef,ChannelId]),
18121848
collect_data(ConnectionRef, ChannelId, EchoSize, [], 0).
18131849

18141850
collect_data(ConnectionRef, ChannelId, EchoSize, Acc, Sum) ->
@@ -1817,18 +1853,14 @@ collect_data(ConnectionRef, ChannelId, EchoSize, Acc, Sum) ->
18171853
{ssh_cm, ConnectionRef, {data, ChannelId, 0, Data}} when is_binary(Data) ->
18181854
ct:log("~p:~p collect_data: received ~p bytes. total ~p bytes, want ~p more",
18191855
[?MODULE,?LINE,size(Data),Sum+size(Data),EchoSize-Sum]),
1820-
ssh_connection:adjust_window(ConnectionRef, ChannelId, size(Data)),
18211856
collect_data(ConnectionRef, ChannelId, EchoSize, [Data | Acc], Sum+size(Data));
18221857
{ssh_cm, ConnectionRef, Msg={eof, ChannelId}} ->
18231858
collect_data_report_end(Acc, Msg, EchoSize);
1824-
18251859
{ssh_cm, ConnectionRef, Msg={closed,ChannelId}} ->
18261860
collect_data_report_end(Acc, Msg, EchoSize);
1827-
18281861
Msg ->
18291862
ct:log("~p:~p collect_data: ***** unexpected message *****~n~p",[?MODULE,?LINE,Msg]),
18301863
collect_data(ConnectionRef, ChannelId, EchoSize, Acc, Sum)
1831-
18321864
after TO ->
18331865
ct:log("~p:~p collect_data: ----- Nothing received for ~p seconds -----~n",[?MODULE,?LINE,TO]),
18341866
collect_data(ConnectionRef, ChannelId, EchoSize, Acc, Sum)
@@ -1867,3 +1899,23 @@ ssh_exec_echo(Cmd, User) ->
18671899
spawn(fun() ->
18681900
io:format("echo ~s ~s\n",[User,Cmd])
18691901
end).
1902+
%% FIXME - upon refactoring this test suite, check if function below is reduntant to collect_data
1903+
receive_bytes(_, _, 0, _) ->
1904+
ct:log("ALL DATA RECEIVED Budget = 0"),
1905+
ct:log("================================ ExpectBudget = 0 (reception completed)"),
1906+
ok;
1907+
receive_bytes(ConnectionRef, ChannelId0, Budget, AccSize) when Budget > 0 ->
1908+
receive
1909+
{ssh_cm, ConnectionRef, {data, ChannelId0, 0, D}} ->
1910+
Fmt = "================================ ExpectBudget = "
1911+
"~p bytes Received/Total = ~p/~p bytes",
1912+
Args = [Budget, byte_size(D), AccSize + byte_size(D)],
1913+
ct:log(Fmt, Args),
1914+
receive_bytes(ConnectionRef, ChannelId0,
1915+
Budget - byte_size(D), AccSize + byte_size(D))
1916+
after
1917+
10000 ->
1918+
ct:log("process_info(self(), messages) = ~p",
1919+
[process_info(self(), messages)]),
1920+
ct:fail("timeout ~p:~p",[?MODULE,?LINE])
1921+
end.

0 commit comments

Comments
 (0)