Skip to content

Commit

Permalink
[erts] Fix enqueue into dirty run queues
Browse files Browse the repository at this point in the history
OTP-18839

When scheduling a process for dirty execution it could end up not being
inserted into the run queue causing the process to get stuck for ever.
  • Loading branch information
rickard-green committed Nov 14, 2023
1 parent ec48b1f commit c809a60
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
49 changes: 19 additions & 30 deletions erts/emulator/beam/erl_process.c
Original file line number Diff line number Diff line change
Expand Up @@ -6388,44 +6388,25 @@ static int
check_dirty_enqueue_in_prio_queue(Process *c_p,
erts_aint32_t *newp,
erts_aint32_t actual,
erts_aint32_t aprio,
erts_aint32_t qbit)
erts_aint32_t aprio)
{
int queue;
erts_aint32_t dact, max_qbit;

/* Termination should be done on an ordinary scheduler */
if ((*newp) & ERTS_PSFLG_EXITING) {
*newp &= ~ERTS_PSFLGS_DIRTY_WORK;
return ERTS_ENQUEUE_NORMAL_QUEUE;
}

/*
* If we have system tasks, we enqueue on ordinary run-queue
* and take care of those system tasks first.
*/
if ((*newp) & ERTS_PSFLG_SYS_TASKS)
return ERTS_ENQUEUE_NORMAL_QUEUE;

dact = erts_atomic32_read_mb(&c_p->xstate);
if (actual & (ERTS_PSFLG_DIRTY_ACTIVE_SYS
| ERTS_PSFLG_DIRTY_CPU_PROC)) {
max_qbit = ((dact >> ERTS_PXSFLGS_IN_CPU_PRQ_MASK_OFFSET)
& ERTS_PXSFLGS_QMASK);
queue = ERTS_ENQUEUE_DIRTY_CPU_QUEUE;
}
else {
ASSERT(actual & ERTS_PSFLG_DIRTY_IO_PROC);
max_qbit = ((dact >> ERTS_PXSFLGS_IN_IO_PRQ_MASK_OFFSET)
& ERTS_PXSFLGS_QMASK);
queue = ERTS_ENQUEUE_DIRTY_IO_QUEUE;
}

max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS;
max_qbit &= -max_qbit;

if (qbit >= max_qbit)
return ERTS_ENQUEUE_NOT; /* Already queued in higher or equal prio */
if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK))
!= (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) {
/*
Expand Down Expand Up @@ -6487,19 +6468,26 @@ check_enqueue_in_prio_queue(Process *c_p,
erts_aint32_t *newp,
erts_aint32_t actual)
{
erts_aint32_t aprio, qbit, max_qbit;
erts_aint32_t aprio, qbit, max_qbit, new = *newp;

aprio = ((*newp) >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK;
aprio = (new >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK;
qbit = 1 << aprio;

*prq_prio_p = aprio;

if (((actual & (ERTS_PSFLG_SUSPENDED
| ERTS_PSFLG_ACTIVE_SYS)) != (ERTS_PSFLG_SUSPENDED
| ERTS_PSFLG_ACTIVE_SYS))
& (!!(actual & ERTS_PSFLGS_DIRTY_WORK))) {
int res = check_dirty_enqueue_in_prio_queue(c_p, newp, actual,
aprio, qbit);
if ((new & (ERTS_PSFLG_SUSPENDED
| ERTS_PSFLG_ACTIVE_SYS
| ERTS_PSFLG_DIRTY_ACTIVE_SYS)) == ERTS_PSFLG_SUSPENDED) {
/*
* Do not schedule this process since we are suspended and we have
* no system work to for the process...
*/
return ERTS_ENQUEUE_NOT;
}

if ((!(new & ERTS_PSFLG_SYS_TASKS))
& (!!(new & ERTS_PSFLGS_DIRTY_WORK))) {
int res = check_dirty_enqueue_in_prio_queue(c_p, newp, actual, aprio);
if (res != ERTS_ENQUEUE_NORMAL_QUEUE)
return res;
}
Expand Down Expand Up @@ -6669,8 +6657,9 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p,
== ERTS_PSFLG_ACTIVE));

n &= ~running_flgs;
if ((!!(a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DIRTY_ACTIVE_SYS))
| ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE))) {
if (a & (ERTS_PSFLG_ACTIVE_SYS
| ERTS_PSFLG_DIRTY_ACTIVE_SYS
| ERTS_PSFLG_ACTIVE)) {
enqueue = check_enqueue_in_prio_queue(p, &enq_prio, &n, a);
}
a = erts_atomic32_cmpxchg_mb(&p->state, n, e);
Expand Down
80 changes: 79 additions & 1 deletion erts/emulator/test/signal_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@
simultaneous_signals_exit/1,
simultaneous_signals_recv_exit/1,
parallel_signal_enqueue_race_1/1,
parallel_signal_enqueue_race_2/1]).
parallel_signal_enqueue_race_2/1,
dirty_schedule/1]).

-export([spawn_spammers/3]).

Expand Down Expand Up @@ -102,6 +103,7 @@ all() ->
monitor_nodes_order,
parallel_signal_enqueue_race_1,
parallel_signal_enqueue_race_2,
dirty_schedule,
{group, adjust_message_queue}].

groups() ->
Expand Down Expand Up @@ -1308,6 +1310,82 @@ parallel_signal_enqueue_race_2_test() ->
false = is_process_alive(R),
ok.

dirty_schedule(Config) when is_list(Config) ->
lists:foreach(fun (_) ->
dirty_schedule_test()
end,
lists:seq(1, 5)),
ok.

dirty_schedule_test() ->
%%
%% PR-7822 (second commit)
%%
%% This bug could occur when a process was to be scheduled due to an
%% incomming signal just as the receiving process was selected for
%% execution on a dirty scheduler. The process could then be inserted
%% into a run-queue simultaneously as it began executing dirty. If
%% the scheduled instance was selected for execution on one dirty
%% scheduler simultaneously as it was scheduled out on another scheduler
%% a race could cause the thread scheduling out the process to think it
%% already was in the run-queue, so there is no need to insert it in the
%% run-queue, while the other thread selecting it for execution dropped
%% the process, since it was already running on another scheduler. By
%% this the process ended up stuck in a runnable state, but not in the
%% run-queue.
%%
%% When the bug was triggered, the receiver could end up in an inconsistent
%% state where it potentially would be stuck for ever.
%%
%% The above scenario is very hard to trigger, so the test typically do
%% not fail even with the bug present, but we at least try to massage
%% the scenario...
%%
Proc = spawn_link(fun DirtyLoop () ->
erts_debug:dirty_io(scheduler,type),
DirtyLoop()
end),
NoPs = lists:seq(1, erlang:system_info(schedulers_online)),
SpawnSender =
fun (Prio) ->
spawn_opt(
fun () ->
Loop = fun Loop (0) ->
ok;
Loop (N) ->
_ = process_info(Proc,
current_function),
Loop(N-1)
end,
receive go -> ok end,
Loop(100000)
end, [monitor,{priority,Prio}])
end,
Go = fun ({P, _M}) -> P ! go end,
WaitProcs = fun ({P, M}) ->
receive {'DOWN', M, process, P, R} ->
normal = R
end
end,
PM1s = lists:map(fun (_) -> SpawnSender(normal) end, NoPs),
lists:foreach(Go, PM1s),
lists:foreach(WaitProcs, PM1s),
PM2s = lists:map(fun (_) -> SpawnSender(high) end, NoPs),
lists:foreach(Go, PM2s),
lists:foreach(WaitProcs, PM2s),
PM3s = lists:map(fun (N) ->
Prio = case N rem 2 of
0 -> normal;
1 -> high
end,
SpawnSender(Prio) end, NoPs),
lists:foreach(Go, PM3s),
lists:foreach(WaitProcs, PM3s),
unlink(Proc),
exit(Proc, kill),
false = is_process_alive(Proc),
ok.

%%
%% -- Internal utils --------------------------------------------------------
%%
Expand Down

0 comments on commit c809a60

Please sign in to comment.