Skip to content

Commit

Permalink
Merge branch 'maint'
Browse files Browse the repository at this point in the history
* maint:
  [erts] Fix long signal delivery time for processes in dirty run queues
  [erts] Fix enqueue into dirty run queues
  [erts] Fix fetch of signals in parallel enqueue race scenario
  • Loading branch information
rickard-green committed Nov 28, 2023
2 parents 463c8c1 + 1bb0131 commit aae49ba
Show file tree
Hide file tree
Showing 9 changed files with 474 additions and 103 deletions.
10 changes: 2 additions & 8 deletions erts/emulator/beam/beam_bif_load.c
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ BIF_RETTYPE erts_internal_check_dirty_process_code_2(BIF_ALIST_2)
{
erts_aint32_t state;
Process *rp;
int dirty, busy, reds = 0;
int busy, reds = 0;
Eterm res;

if (BIF_P != erts_dirty_process_signal_handler
Expand All @@ -652,13 +652,7 @@ BIF_RETTYPE erts_internal_check_dirty_process_code_2(BIF_ALIST_2)
BIF_RET(am_false);

state = erts_atomic32_read_nob(&rp->state);
dirty = (state & ERTS_PSFLG_DIRTY_RUNNING);
/*
* Ignore ERTS_PSFLG_DIRTY_RUNNING_SYS (see
* comment in erts_execute_dirty_system_task()
* in erl_process.c).
*/
if (!dirty)
if (!ERTS_PROC_IN_DIRTY_STATE(state))
BIF_RET(am_normal);

busy = erts_proc_trylock(rp, ERTS_PROC_LOCK_MAIN) == EBUSY;
Expand Down
5 changes: 1 addition & 4 deletions erts/emulator/beam/bif.c
Original file line number Diff line number Diff line change
Expand Up @@ -1706,9 +1706,7 @@ static Eterm process_flag_aux(Process *c_p, int *redsp, Eterm flag, Eterm val)
*/

state = erts_atomic32_read_nob(&c_p->state);
if (state & (ERTS_PSFLG_RUNNING_SYS
| ERTS_PSFLG_DIRTY_RUNNING_SYS
| ERTS_PSFLG_DIRTY_RUNNING)) {
if (!(state & ERTS_PSFLG_RUNNING)) {
/*
* We are either processing signals before
* being executed or executing dirty. That
Expand All @@ -1717,7 +1715,6 @@ static Eterm process_flag_aux(Process *c_p, int *redsp, Eterm flag, Eterm val)
*redsp = 1;
}
else {
ASSERT(state & ERTS_PSFLG_RUNNING);

/*
* F_DELAY_GC is currently only set when
Expand Down
1 change: 1 addition & 0 deletions erts/emulator/beam/erl_alloc.types
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ type SL_MPATHS SHORT_LIVED SYSTEM sl_migration_paths
type T2B_DETERMINISTIC SHORT_LIVED PROCESSES term_to_binary_deterministic

type DSIG_HNDL_NTFY SHORT_LIVED PROCESSES dirty_signal_handler_notification
type SCHD_SIG_NTFY SHORT_LIVED PROCESSES scheduled_signal_notify
type CODE_COVERAGE STANDARD SYSTEM code_coverage

#
Expand Down
125 changes: 101 additions & 24 deletions erts/emulator/beam/erl_proc_sig_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,7 @@ notify_dirty_signal_handler(Eterm pid,
ErtsMessage *mp;
Process *sig_handler;

ASSERT(state & ERTS_PSFLG_DIRTY_RUNNING);
ASSERT(state & (ERTS_PSFLGS_DIRTY_WORK|ERTS_PSFLG_DIRTY_RUNNING));

if (prio < 0)
prio = (int) ERTS_PSFLGS_GET_USR_PRIO(state);
Expand Down Expand Up @@ -958,13 +958,10 @@ delayed_notify_dirty_signal_handler(void *vdshnp)
if (proc) {
erts_aint32_t state = erts_atomic32_read_acqb(&proc->state);
/*
* Notify the dirty signal handler if it is still running
* dirty and still have signals to handle...
* Notify the dirty signal handler if it is still scheduled
* or running dirty and still have signals to handle...
*/
if (!!(state & ERTS_PSFLG_DIRTY_RUNNING)
& !!(state & (ERTS_PSFLG_SIG_Q
| ERTS_PSFLG_NMSG_SIG_IN_Q
| ERTS_PSFLG_MSG_SIG_IN_Q))) {
if (ERTS_PROC_NEED_DIRTY_SIG_HANDLING(state)) {
notify_dirty_signal_handler(dshnp->pid, state, dshnp->prio);
}
}
Expand Down Expand Up @@ -1244,12 +1241,7 @@ maybe_elevate_sig_handling_prio(Process *c_p, int prio, Eterm other)
if (res) {
/* ensure handled if dirty executing... */
state = erts_atomic32_read_nob(&rp->state);
/*
* We ignore ERTS_PSFLG_DIRTY_RUNNING_SYS. For
* more info see erts_execute_dirty_system_task()
* in erl_process.c.
*/
if (state & ERTS_PSFLG_DIRTY_RUNNING)
if (ERTS_PROC_NEED_DIRTY_SIG_HANDLING(state))
erts_ensure_dirty_proc_signals_handled(rp, state,
min_prio, 0);
}
Expand All @@ -1258,6 +1250,15 @@ maybe_elevate_sig_handling_prio(Process *c_p, int prio, Eterm other)
return res;
}

typedef struct {
Eterm pid;
int nmsig;
int msig;
} ErtsSchedSignalNotify;

static void
sched_sig_notify(void *vssnp);

void
erts_proc_sig_fetch__(Process *proc,
ErtsSignalInQueueBufferArray *buffers,
Expand Down Expand Up @@ -1391,11 +1392,53 @@ erts_proc_sig_fetch__(Process *proc,
* future call to erts_proc_sig_fetch().
*/
if (erts_atomic32_read_nob(&buffers->nonmsgs_in_slots))
set_flags |= ERTS_PSFLG_ACTIVE_SYS|ERTS_PSFLG_NMSG_SIG_IN_Q;
set_flags |= ERTS_PSFLG_NMSG_SIG_IN_Q;
if (erts_atomic32_read_nob(&buffers->msgs_in_slots))
set_flags |= ERTS_PSFLG_ACTIVE|ERTS_PSFLG_MSG_SIG_IN_Q;
if (set_flags)
(void) erts_atomic32_read_bor_relb(&proc->state, set_flags);
set_flags |= ERTS_PSFLG_MSG_SIG_IN_Q;
if (set_flags) {
erts_aint32_t oflgs;
oflgs = erts_atomic32_read_bor_relb(&proc->state, set_flags);
if ((oflgs & (ERTS_PSFLG_NMSG_SIG_IN_Q
| ERTS_PSFLG_MSG_SIG_IN_Q)) != set_flags) {
int msig = 0, nmsig = 0;
/*
* We did set at least one of the flags; check if we may
* need to set corresponding active flag(s)...
*/
if ((!!(set_flags & ERTS_PSFLG_NMSG_SIG_IN_Q))
& (!(oflgs & (ERTS_PSFLG_NMSG_SIG_IN_Q
| ERTS_PSFLG_ACTIVE_SYS)))) {
/* We set nmsig-in-q flag and active-sys missing... */
nmsig = !0;
}
if ((!!(set_flags & ERTS_PSFLG_MSG_SIG_IN_Q))
& (!(oflgs & (ERTS_PSFLG_MSG_SIG_IN_Q
| ERTS_PSFLG_ACTIVE)))) {
/* We set msig-in-q flag and active missing... */
msig = !0;
}
if (msig | nmsig) {
/*
* We don't know exactly what locks we got, so
* we need to schedule the notification...
*/
ErtsSchedulerData *esdp = erts_get_scheduler_data();
int tid = (esdp && esdp->type == ERTS_SCHED_NORMAL
? (int) esdp->no
: 1);
ErtsSchedSignalNotify *ssnp =
(ErtsSchedSignalNotify *)
erts_alloc(ERTS_ALC_T_SCHD_SIG_NTFY,
sizeof(ErtsSchedSignalNotify));
ssnp->nmsig = nmsig;
ssnp->msig = msig;
ssnp->pid = proc->common.id;
erts_schedule_misc_aux_work(tid,
sched_sig_notify,
(void *) ssnp);
}
}
}
/* else:
* Another thread is currently operating on a buffer and
* will soon set appropriate.
Expand All @@ -1413,6 +1456,45 @@ erts_proc_sig_fetch__(Process *proc,

}

static void
sched_sig_notify(void *vssnp)
{
ErtsSchedSignalNotify *ssnp = (ErtsSchedSignalNotify *) vssnp;
Process *proc = erts_proc_lookup(ssnp->pid);
if (proc) {
erts_aint32_t state = erts_atomic32_read_acqb(&proc->state);
int nmsig = ssnp->nmsig;
int msig = ssnp->msig;
ASSERT(nmsig || msig);
if ((!!nmsig) & ((!(state & (ERTS_PSFLG_SIG_Q
| ERTS_PSFLG_NMSG_SIG_IN_Q)))
| (!!(state & ERTS_PSFLG_ACTIVE_SYS)))) {
/*
* Either already handled or someone else set the active-sys
* flag...
*/
nmsig = 0;
}
if ((!!msig) & (!!(state & ERTS_PSFLG_ACTIVE))) {
/*
* Someone else set the active flag (we cannot determine if it
* has been handled or not by looking at the state flag)...
*/
msig = 0;
}
if (msig|nmsig) {
if (!nmsig) {
erts_proc_notify_new_message(proc, 0);
}
else {
erts_aint32_t extra = msig ? ERTS_PSFLG_ACTIVE : 0;
erts_proc_notify_new_sig(proc, state, extra);
}
}
}
erts_free(ERTS_ALC_T_SCHD_SIG_NTFY, vssnp);
}

void
erts_proc_sig_destroy_unlink_op(ErtsSigUnlinkOp *sulnk)
{
Expand Down Expand Up @@ -8165,12 +8247,7 @@ erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1)
BIF_RET(am_noproc);

state = erts_atomic32_read_acqb(&rp->state);
dirty = (state & ERTS_PSFLG_DIRTY_RUNNING);
/*
* Ignore ERTS_PSFLG_DIRTY_RUNNING_SYS (see
* comment in erts_execute_dirty_system_task()
* in erl_process.c).
*/
dirty = ERTS_PROC_IN_DIRTY_STATE(state);
if (!dirty)
BIF_RET(am_normal);

Expand All @@ -8184,7 +8261,7 @@ erts_internal_dirty_process_handle_signals_1(BIF_ALIST_1)

state = erts_atomic32_read_mb(&rp->state);
noproc = (state & ERTS_PSFLG_FREE);
dirty = (state & ERTS_PSFLG_DIRTY_RUNNING);
dirty = ERTS_PROC_NEED_DIRTY_SIG_HANDLING(state);

if (busy) {
if (noproc)
Expand Down
16 changes: 4 additions & 12 deletions erts/emulator/beam/erl_proc_sig_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -2004,29 +2004,21 @@ erts_proc_notify_new_sig(Process* rp, erts_aint32_t state,
state = erts_proc_sys_schedule(rp, state, enable_flag);
}

if (state & ERTS_PSFLG_DIRTY_RUNNING) {
/*
* We ignore ERTS_PSFLG_DIRTY_RUNNING_SYS. For
* more info see erts_execute_dirty_system_task()
* in erl_process.c.
*/
if (ERTS_PROC_IN_DIRTY_STATE(state)) {
erts_ensure_dirty_proc_signals_handled(rp, state, -1, 0);
}
}



ERTS_GLB_INLINE void
erts_proc_notify_new_message(Process *p, ErtsProcLocks locks)
{
/* No barrier needed, due to msg lock */
erts_aint32_t state = erts_atomic32_read_nob(&p->state);
if (!(state & ERTS_PSFLG_ACTIVE))
erts_schedule_process(p, state, locks);
if (state & ERTS_PSFLG_DIRTY_RUNNING) {
/*
* We ignore ERTS_PSFLG_DIRTY_RUNNING_SYS. For
* more info see erts_execute_dirty_system_task()
* in erl_process.c.
*/
if (ERTS_PROC_NEED_DIRTY_SIG_HANDLING(state)) {
erts_ensure_dirty_proc_signals_handled(p, state, -1, locks);
}
}
Expand Down
Loading

0 comments on commit aae49ba

Please sign in to comment.