From c809a602f1f010b2502e769ee41b2272ae104a30 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Thu, 2 Nov 2023 02:10:37 +0100 Subject: [PATCH] [erts] Fix enqueue into dirty run queues OTP-18839 When scheduling a process for dirty execution it could end up not being inserted into the run queue causing the process to get stuck for ever. --- erts/emulator/beam/erl_process.c | 49 +++++++----------- erts/emulator/test/signal_SUITE.erl | 80 ++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 31 deletions(-) diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index c5d5cdb6e50e..08f670836406 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -6388,11 +6388,9 @@ static int check_dirty_enqueue_in_prio_queue(Process *c_p, erts_aint32_t *newp, erts_aint32_t actual, - erts_aint32_t aprio, - erts_aint32_t qbit) + erts_aint32_t aprio) { int queue; - erts_aint32_t dact, max_qbit; /* Termination should be done on an ordinary scheduler */ if ((*newp) & ERTS_PSFLG_EXITING) { @@ -6400,32 +6398,15 @@ check_dirty_enqueue_in_prio_queue(Process *c_p, return ERTS_ENQUEUE_NORMAL_QUEUE; } - /* - * If we have system tasks, we enqueue on ordinary run-queue - * and take care of those system tasks first. - */ - if ((*newp) & ERTS_PSFLG_SYS_TASKS) - return ERTS_ENQUEUE_NORMAL_QUEUE; - - dact = erts_atomic32_read_mb(&c_p->xstate); if (actual & (ERTS_PSFLG_DIRTY_ACTIVE_SYS | ERTS_PSFLG_DIRTY_CPU_PROC)) { - max_qbit = ((dact >> ERTS_PXSFLGS_IN_CPU_PRQ_MASK_OFFSET) - & ERTS_PXSFLGS_QMASK); queue = ERTS_ENQUEUE_DIRTY_CPU_QUEUE; } else { ASSERT(actual & ERTS_PSFLG_DIRTY_IO_PROC); - max_qbit = ((dact >> ERTS_PXSFLGS_IN_IO_PRQ_MASK_OFFSET) - & ERTS_PXSFLGS_QMASK); queue = ERTS_ENQUEUE_DIRTY_IO_QUEUE; } - max_qbit |= 1 << ERTS_PSFLGS_QMASK_BITS; - max_qbit &= -max_qbit; - - if (qbit >= max_qbit) - return ERTS_ENQUEUE_NOT; /* Already queued in higher or equal prio */ if ((actual & (ERTS_PSFLG_IN_RUNQ|ERTS_PSFLGS_USR_PRIO_MASK)) != (aprio << ERTS_PSFLGS_USR_PRIO_OFFSET)) { /* @@ -6487,19 +6468,26 @@ check_enqueue_in_prio_queue(Process *c_p, erts_aint32_t *newp, erts_aint32_t actual) { - erts_aint32_t aprio, qbit, max_qbit; + erts_aint32_t aprio, qbit, max_qbit, new = *newp; - aprio = ((*newp) >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK; + aprio = (new >> ERTS_PSFLGS_ACT_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK; qbit = 1 << aprio; *prq_prio_p = aprio; - if (((actual & (ERTS_PSFLG_SUSPENDED - | ERTS_PSFLG_ACTIVE_SYS)) != (ERTS_PSFLG_SUSPENDED - | ERTS_PSFLG_ACTIVE_SYS)) - & (!!(actual & ERTS_PSFLGS_DIRTY_WORK))) { - int res = check_dirty_enqueue_in_prio_queue(c_p, newp, actual, - aprio, qbit); + if ((new & (ERTS_PSFLG_SUSPENDED + | ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS)) == ERTS_PSFLG_SUSPENDED) { + /* + * Do not schedule this process since we are suspended and we have + * no system work to for the process... + */ + return ERTS_ENQUEUE_NOT; + } + + if ((!(new & ERTS_PSFLG_SYS_TASKS)) + & (!!(new & ERTS_PSFLGS_DIRTY_WORK))) { + int res = check_dirty_enqueue_in_prio_queue(c_p, newp, actual, aprio); if (res != ERTS_ENQUEUE_NORMAL_QUEUE) return res; } @@ -6669,8 +6657,9 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, == ERTS_PSFLG_ACTIVE)); n &= ~running_flgs; - if ((!!(a & (ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_DIRTY_ACTIVE_SYS)) - | ((a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED)) == ERTS_PSFLG_ACTIVE))) { + if (a & (ERTS_PSFLG_ACTIVE_SYS + | ERTS_PSFLG_DIRTY_ACTIVE_SYS + | ERTS_PSFLG_ACTIVE)) { enqueue = check_enqueue_in_prio_queue(p, &enq_prio, &n, a); } a = erts_atomic32_cmpxchg_mb(&p->state, n, e); diff --git a/erts/emulator/test/signal_SUITE.erl b/erts/emulator/test/signal_SUITE.erl index adff299eddb5..c22296eae745 100644 --- a/erts/emulator/test/signal_SUITE.erl +++ b/erts/emulator/test/signal_SUITE.erl @@ -62,7 +62,8 @@ simultaneous_signals_exit/1, simultaneous_signals_recv_exit/1, parallel_signal_enqueue_race_1/1, - parallel_signal_enqueue_race_2/1]). + parallel_signal_enqueue_race_2/1, + dirty_schedule/1]). -export([spawn_spammers/3]). @@ -102,6 +103,7 @@ all() -> monitor_nodes_order, parallel_signal_enqueue_race_1, parallel_signal_enqueue_race_2, + dirty_schedule, {group, adjust_message_queue}]. groups() -> @@ -1308,6 +1310,82 @@ parallel_signal_enqueue_race_2_test() -> false = is_process_alive(R), ok. +dirty_schedule(Config) when is_list(Config) -> + lists:foreach(fun (_) -> + dirty_schedule_test() + end, + lists:seq(1, 5)), + ok. + +dirty_schedule_test() -> + %% + %% PR-7822 (second commit) + %% + %% This bug could occur when a process was to be scheduled due to an + %% incomming signal just as the receiving process was selected for + %% execution on a dirty scheduler. The process could then be inserted + %% into a run-queue simultaneously as it began executing dirty. If + %% the scheduled instance was selected for execution on one dirty + %% scheduler simultaneously as it was scheduled out on another scheduler + %% a race could cause the thread scheduling out the process to think it + %% already was in the run-queue, so there is no need to insert it in the + %% run-queue, while the other thread selecting it for execution dropped + %% the process, since it was already running on another scheduler. By + %% this the process ended up stuck in a runnable state, but not in the + %% run-queue. + %% + %% When the bug was triggered, the receiver could end up in an inconsistent + %% state where it potentially would be stuck for ever. + %% + %% The above scenario is very hard to trigger, so the test typically do + %% not fail even with the bug present, but we at least try to massage + %% the scenario... + %% + Proc = spawn_link(fun DirtyLoop () -> + erts_debug:dirty_io(scheduler,type), + DirtyLoop() + end), + NoPs = lists:seq(1, erlang:system_info(schedulers_online)), + SpawnSender = + fun (Prio) -> + spawn_opt( + fun () -> + Loop = fun Loop (0) -> + ok; + Loop (N) -> + _ = process_info(Proc, + current_function), + Loop(N-1) + end, + receive go -> ok end, + Loop(100000) + end, [monitor,{priority,Prio}]) + end, + Go = fun ({P, _M}) -> P ! go end, + WaitProcs = fun ({P, M}) -> + receive {'DOWN', M, process, P, R} -> + normal = R + end + end, + PM1s = lists:map(fun (_) -> SpawnSender(normal) end, NoPs), + lists:foreach(Go, PM1s), + lists:foreach(WaitProcs, PM1s), + PM2s = lists:map(fun (_) -> SpawnSender(high) end, NoPs), + lists:foreach(Go, PM2s), + lists:foreach(WaitProcs, PM2s), + PM3s = lists:map(fun (N) -> + Prio = case N rem 2 of + 0 -> normal; + 1 -> high + end, + SpawnSender(Prio) end, NoPs), + lists:foreach(Go, PM3s), + lists:foreach(WaitProcs, PM3s), + unlink(Proc), + exit(Proc, kill), + false = is_process_alive(Proc), + ok. + %% %% -- Internal utils -------------------------------------------------------- %%