Skip to content

Commit 360c3a3

Browse files
authored
Merge pull request #185 from esl/cosmetics
Cosmetics changes to improve readability, types, and upgrades
2 parents 887e312 + 7d9299b commit 360c3a3

File tree

12 files changed

+145
-112
lines changed

12 files changed

+145
-112
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,18 @@ on:
44
push:
55
branches: [ master ]
66
pull_request:
7-
branches: [ master ]
7+
workflow_dispatch:
8+
89

910
jobs:
1011
test:
1112
name: ${{ matrix.test-type }} test on OTP ${{matrix.otp_vsn}}
1213
strategy:
1314
matrix:
14-
otp_vsn: ['26.2', '25.3', '24.3']
15-
rebar_vsn: ['3.22.0']
15+
otp_vsn: ['27', '26', '25']
16+
rebar_vsn: ['3.23.0']
1617
test-type: ['regular', 'integration']
17-
runs-on: 'ubuntu-22.04'
18+
runs-on: 'ubuntu-24.04'
1819
steps:
1920
- uses: actions/checkout@v4
2021
- uses: erlef/setup-beam@v1

guides/configuration.md

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ Amoc supports the following generic configuration parameters:
99
* default value - empty list (`[]`)
1010
* example: `AMOC_NODES="['amoc@amoc-1', 'amoc@amoc-2']"`
1111

12-
* `api_port` - a port for the amoc REST interfaces:
13-
* default value - 4000
14-
* example: `AMOC_API_PORT="4000"`
1512

1613
* `interarrival` - a delay (in ms, for each node in the cluster independently) between creating the processes
1714
for two consecutive users:
Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
-module(dummy_helper).
22

3-
-required_variable(#{name=>dummy_var, description=>"dummy_var",
4-
default_value=>default_value}).
3+
-include_lib("stdlib/include/assert.hrl").
4+
5+
-required_variable(#{name => dummy_var,
6+
description => "dummy_var",
7+
default_value => default_value}).
8+
9+
-define(comment(U), io_lib:format("Condition failed with last users distribution ~n~p", [U])).
510

611
%% amoc_dist testing function
712
-export([test_amoc_dist/0]).
@@ -11,65 +16,73 @@ test_amoc_dist() ->
1116
Master = amoc_cluster:master_node(),
1217
Slaves = amoc_cluster:slave_nodes(),
1318
%% check the status of the nodes
14-
disabled = rpc:call(Master, amoc_controller, get_status, []),
15-
[{running, #{scenario := dummy_scenario}} = rpc:call(Node, amoc_controller, get_status, [])
16-
|| Node <- Slaves],
17-
%% check user ids
18-
{N1, Nodes1, Ids1, Max1} = get_users_info(Slaves),
19-
true = N1 > 0,
20-
N1 = Max1,
21-
Ids1 = lists:seq(1, N1),
19+
?assertEqual(disabled, get_status(Master)),
20+
[ ?assertMatch({running, #{scenario := dummy_scenario}}, get_status(Node)) || Node <- Slaves],
21+
%% check user ids, users have all been started at the first two nodes
22+
{N1, Max1, Nodes1, Ids1, Users1} = get_users_info(Slaves),
23+
?assert(N1 > 0),
24+
?assertEqual(N1, Max1, ?comment(Users1)),
25+
?assertEqual(Ids1, lists:seq(1, N1), ?comment(Users1)),
2226
[AddedNode] = Slaves -- Nodes1,
2327
%% add 20 users
24-
{ok, _} = rpc:call(Master, amoc_dist, add, [20]),
25-
timer:sleep(3000),
26-
{N2, Nodes2, Ids2, Max2} = get_users_info(Slaves),
27-
N2 = Max2,
28-
Ids2 = lists:seq(1, N2),
29-
[AddedNode] = Nodes2 -- Nodes1,
30-
N2 = N1 + 20,
28+
add_and_wait(Master, 20),
29+
{N2, Max2, Nodes2, Ids2, Users2} = get_users_info(Slaves),
30+
?assertEqual(N2, Max2, ?comment(Users2)),
31+
?assertEqual(Ids2, lists:seq(1, N2), ?comment(Users2)),
32+
?assertEqual([AddedNode], Nodes2 -- Nodes1, ?comment(Users2)),
33+
?assertEqual(N2, N1 + 20, ?comment(Users2)),
3134
%% remove 10 users
32-
{ok, _} = rpc:call(Master, amoc_dist, remove, [10, true]),
33-
timer:sleep(3000),
34-
{N3, Nodes3, _Ids3, Max3} = get_users_info(Slaves),
35-
Nodes2 = Nodes3,
36-
Max3 = Max2,
37-
N2 = N3 + 10,
35+
remove_and_wait(Master, 10),
36+
{N3, Max3, Nodes3, _Ids3, Users3} = get_users_info(Slaves),
37+
?assertEqual(N2 - 10, N3, ?comment(Users3)),
38+
?assertEqual(Max2, Max3, ?comment(Users3)),
39+
?assertEqual(Nodes2, Nodes3, ?comment(Users3)),
3840
%% try to remove N3 users
39-
{ok, Ret} = rpc:call(Master, amoc_dist, remove, [N3, true]),
41+
Ret = remove_and_wait(Master, N3),
4042
RemovedN = lists:sum([N || {_, N} <- Ret]),
41-
timer:sleep(3000),
42-
{N4, Nodes4, Ids4, _Max4} = get_users_info(Slaves),
43-
Nodes1 = Nodes4,
44-
N3 = N4 + RemovedN,
45-
true = RemovedN < N3,
43+
{N4, _Max4, Nodes4, Ids4, Users4} = get_users_info(Slaves),
44+
?assertEqual(N3 - RemovedN, N4, ?comment(Users4)),
45+
?assertEqual(Nodes1, Nodes4, ?comment(Users4)),
46+
?assert(RemovedN < N3),
4647
%% add 20 users
47-
{ok, _} = rpc:call(Master, amoc_dist, add, [20]),
48-
timer:sleep(3000),
49-
{N5, Nodes5, Ids5, Max5} = get_users_info(Slaves),
50-
Nodes2 = Nodes5,
51-
Max5 = Max2 + 20,
52-
N5 = N4 + 20,
53-
true = Ids5 -- Ids4 =:= lists:seq(Max2 + 1, Max5),
48+
add_and_wait(Master, 20),
49+
{N5, Max5, Nodes5, Ids5, Users5} = get_users_info(Slaves),
50+
?assertEqual(Nodes2, Nodes5, ?comment(Users5)),
51+
?assertEqual(Max5, Max2 + 20, ?comment(Users5)),
52+
?assertEqual(N5, N4 + 20, ?comment(Users5)),
53+
?assertEqual(Ids5 -- Ids4, lists:seq(Max2 + 1, Max5), ?comment(Users5)),
5454
%% terminate scenario
55-
{ok,_} = rpc:call(Master, amoc_dist, stop, []),
56-
timer:sleep(3000),
57-
[{finished, dummy_scenario} = rpc:call(Node, amoc_controller, get_status, [])
58-
|| Node <- Slaves],
55+
stop(Master),
56+
[ ?assertEqual({finished, dummy_scenario}, get_status(Node)) || Node <- Slaves],
5957
%% return expected value
6058
amoc_dist_works_as_expected
6159
catch
6260
C:E:S ->
63-
{error, {C, E, S}}
61+
{C, E, S}
6462
end.
6563

6664
get_users_info(SlaveNodes) ->
67-
Users = [{Node, Id} ||
68-
Node <- SlaveNodes,
69-
{_Pid, Id} <- rpc:call(Node, amoc_users_sup, get_all_children, [])],
70-
Ids = lists:usort([Id || {_, Id} <- Users]),
71-
Nodes = lists:usort([Node || {Node, _} <- Users]),
65+
Distrib = [ {Node, erpc:call(Node, amoc_users_sup, get_all_children, [])} || Node <- SlaveNodes ],
66+
Ids = lists:usort([Id || {_Node, Users} <- Distrib, {_, Id} <- Users]),
67+
Nodes = lists:usort([Node || {Node, Users} <- Distrib, [] =/= Users]),
7268
N = length(Ids),
73-
N = length(Users),
7469
MaxId = lists:max(Ids),
75-
{N, Nodes, Ids, MaxId}.
70+
{N, MaxId, Nodes, Ids, Distrib}.
71+
72+
add_and_wait(Master, Num) ->
73+
{ok, Ret} = erpc:call(Master, amoc_dist, add, [Num]),
74+
timer:sleep(3000),
75+
Ret.
76+
77+
remove_and_wait(Master, Num) ->
78+
{ok, Ret} = erpc:call(Master, amoc_dist, remove, [Num, true]),
79+
timer:sleep(3000),
80+
Ret.
81+
82+
stop(Master) ->
83+
{ok, Ret} = erpc:call(Master, amoc_dist, stop, []),
84+
timer:sleep(3000),
85+
Ret.
86+
87+
get_status(Node) ->
88+
erpc:call(Node, amoc_controller, get_status, []).

src/amoc_controller.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@
7979
%% ------------------------------------------------------------------
8080

8181
%% @private
82-
-spec start_link() -> {ok, pid()}.
82+
-spec start_link() -> gen_server:start_ret().
8383
start_link() ->
8484
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
8585

src/amoc_scenario.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ terminate(Scenario, State) ->
7373
%% if scenario module exports both functions, `Scenario:start/2' is used.
7474
%%
7575
%% Runs on the user process and spans a `[amoc, scenario, user, _]' telemetry event.
76-
-spec start(amoc:scenario(), user_id(), state()) -> any().
76+
-spec start(amoc:scenario(), user_id(), state()) -> term().
7777
start(Scenario, Id, State) ->
7878
Metadata = #{scenario => Scenario, state => State, user_id => Id},
7979
Span = case {erlang:function_exported(Scenario, start, 2),

src/amoc_sup.erl

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,14 @@
1111
-export([init/1]).
1212

1313
%% Helper macro for declaring children of supervisor
14-
-define(WORKER(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
15-
-define(SUP(I, Type), {I, {I, start_link, []}, permanent, infinity, Type, [I]}).
14+
-define(WORKER(I), {I, {I, start_link, []}, permanent, 5000, worker, [I]}).
15+
-define(SUP(I), {I, {I, start_link, []}, permanent, infinity, supervisor, [I]}).
1616

1717
%% ===================================================================
1818
%% API functions
1919
%% ===================================================================
2020

21-
-spec start_link() -> {ok, Pid :: pid()}.
21+
-spec start_link() -> supervisor:startlink_ret().
2222
start_link() ->
2323
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
2424

@@ -30,10 +30,10 @@ start_link() ->
3030
init([]) ->
3131
{ok, {#{strategy => one_for_all, intensity => 0},
3232
[
33-
?SUP(amoc_users_sup, supervisor),
34-
?SUP(amoc_throttle_sup, supervisor),
35-
?SUP(amoc_coordinator_sup, supervisor),
36-
?WORKER(amoc_controller, worker),
37-
?WORKER(amoc_cluster, worker),
38-
?WORKER(amoc_code_server, worker)
33+
?SUP(amoc_users_sup),
34+
?SUP(amoc_throttle_sup),
35+
?SUP(amoc_coordinator_sup),
36+
?WORKER(amoc_controller),
37+
?WORKER(amoc_cluster),
38+
?WORKER(amoc_code_server)
3939
]}}.

src/dist/amoc_dist.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ get_state() ->
9494
case {amoc_cluster:master_node(), get_param(state)} of
9595
{undefined, undefined} -> idle;
9696
{_, {ok, State}} -> State;
97-
{Node, undefined} -> rpc:call(Node, ?MODULE, ?FUNCTION_NAME, [])
97+
{Node, undefined} -> erpc:call(Node, ?MODULE, ?FUNCTION_NAME, [])
9898
end.
9999

100100
%% ------------------------------------------------------------------
@@ -172,7 +172,7 @@ setup_slave_node(Node) ->
172172
{ok, _} ->
173173
{ok, Scenario} = get_param(scenario),
174174
{ok, Settings} = get_param(settings),
175-
rpc:call(Node, amoc_controller, start_scenario, [Scenario, Settings]);
175+
erpc:call(Node, amoc_controller, start_scenario, [Scenario, Settings]);
176176
Error -> Error
177177
end.
178178

@@ -196,7 +196,7 @@ add_users(Result, LastId, Count, [Node | T] = Nodes) ->
196196
0 ->
197197
add_users([{Node, {ok, node_skipped}} | Result], LastId, Count, T);
198198
N ->
199-
Ret = rpc:call(Node, amoc_controller, add_users, [LastId + 1, LastId + N]),
199+
Ret = erpc:call(Node, amoc_controller, add_users, [LastId + 1, LastId + N]),
200200
add_users([{Node, Ret} | Result], LastId + N, Count - N, T)
201201
end.
202202

@@ -213,7 +213,7 @@ remove_users(Result, Count, ForceRemove, [Node | T] = Nodes) ->
213213
0 ->
214214
remove_users([{Node, {ok, node_skipped}} | Result], Count, ForceRemove, T);
215215
N ->
216-
Ret = rpc:call(Node, amoc_controller, remove_users, [N, ForceRemove]),
216+
Ret = erpc:call(Node, amoc_controller, remove_users, [N, ForceRemove]),
217217
remove_users([{Node, Ret} | Result], Count - N, ForceRemove, T)
218218
end.
219219

@@ -225,7 +225,7 @@ update_settings_on_nodes(Settings, Nodes) ->
225225
-spec update_settings_on_node(amoc_config:settings(), node()) ->
226226
ok | {badrpc, any()} | {error, any()}.
227227
update_settings_on_node(Settings, Node) ->
228-
rpc:call(Node, amoc_controller, update_settings, [Settings]).
228+
erpc:call(Node, amoc_controller, update_settings, [Settings]).
229229

230230
-spec stop_cluster() -> {ok, any()} | {error, any()}.
231231
stop_cluster() ->
@@ -234,7 +234,7 @@ stop_cluster() ->
234234
{_, []} -> {error, no_slave_nodes};
235235
{MasterNode, Slaves} ->
236236
set_state(stopped),
237-
Result = [{Node, rpc:call(Node, amoc_controller, stop_scenario, [])} ||
237+
Result = [{Node, erpc:call(Node, amoc_controller, stop_scenario, [])} ||
238238
Node <- Slaves],
239239
maybe_error(Result);
240240
{_, _} -> {error, not_a_master}

src/throttle/amoc_throttle_process.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
handle_info/2,
2525
handle_cast/2,
2626
handle_continue/2,
27-
format_status/2]).
27+
format_status/1]).
2828

2929
-define(PG_SCOPE, amoc_throttle).
3030
-define(DEFAULT_MSG_TIMEOUT, 60000).%% one minute
@@ -143,12 +143,13 @@ handle_continue(maybe_run_fn, State) ->
143143
NewState = maybe_run_fn(State),
144144
{noreply, NewState, timeout(NewState)}.
145145

146-
-spec format_status(term(), term()) -> term().
147-
format_status(_Opt, [_PDict, State]) ->
146+
-spec format_status(gen_server:format_status()) -> gen_server:format_status().
147+
format_status(#{state := #state{} = State} = FormatStatus) ->
148148
ScheduleLen = length(State#state.schedule),
149149
ScheduleRevLen = length(State#state.schedule_reversed),
150150
State1 = setelement(#state.schedule, State, ScheduleLen),
151-
setelement(#state.schedule_reversed, State1, ScheduleRevLen).
151+
State2 = setelement(#state.schedule_reversed, State1, ScheduleRevLen),
152+
FormatStatus#{state := State2}.
152153

153154
%%------------------------------------------------------------------------------
154155
%% internal functions

src/users/amoc_user.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,15 @@
1414
start_link(Scenario, Id, State) ->
1515
proc_lib:start_link(?MODULE, init, [self(), Scenario, Id, State]).
1616

17-
-spec stop() -> no_return().
17+
-spec stop() -> ok.
1818
stop() ->
1919
stop(self(), false).
2020

2121
-spec stop(pid(), boolean()) -> ok.
2222
stop(Pid, Force) when is_pid(Pid) ->
2323
amoc_users_sup:stop_child(Pid, Force).
2424

25-
-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) ->
26-
no_return().
25+
-spec init(pid(), amoc:scenario(), amoc_scenario:user_id(), state()) -> term().
2726
init(Parent, Scenario, Id, State) ->
2827
proc_lib:init_ack(Parent, {ok, self()}),
2928
process_flag(trap_exit, true),

src/users/amoc_users_sup.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,14 +139,14 @@ terminate_all_children() ->
139139
-spec get_sup_for_user_id(amoc_scenario:user_id()) -> pid().
140140
get_sup_for_user_id(Id) ->
141141
#storage{sups = Supervisors, sups_count = SupCount} = persistent_term:get(?MODULE),
142-
Index = Id rem SupCount + 1,
142+
Index = erlang:phash2(Id, SupCount) + 1,
143143
element(Index, Supervisors).
144144

145145
%% assign which users each worker will be requested to add
146146
-spec assign_users_to_sups(pos_integer(), tuple(), [amoc_scenario:user_id()], Acc) ->
147147
Acc when Acc :: #{pid() := [amoc_scenario:user_id()]}.
148148
assign_users_to_sups(SupCount, Supervisors, [Id | Ids], Acc) ->
149-
Index = Id rem SupCount + 1,
149+
Index = erlang:phash2(Id, SupCount) + 1,
150150
ChosenSup = element(Index, Supervisors),
151151
Vs = maps:get(ChosenSup, Acc),
152152
NewAcc = Acc#{ChosenSup := [Id | Vs]},

0 commit comments

Comments
 (0)