diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index c264082e7561..d122cac4ba4e 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -553,6 +553,7 @@ atom parent atom Plus='+' atom PlusPlus='++' atom pause +atom pause_proc_timer atom pending atom pending_driver atom pending_process @@ -615,6 +616,7 @@ atom reset atom reset_seq_trace atom restart atom resume +atom resume_proc_timer atom return_from atom return_to atom return_to_trace diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index c7aa2bff1c78..1b2823351a98 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -220,7 +220,7 @@ bif erlang:seq_trace_info/1 bif erlang:seq_trace_print/1 bif erlang:seq_trace_print/2 bif erts_internal:suspend_process/2 -bif erlang:resume_process/1 +bif erlang:resume_process/2 bif erts_internal:process_display/2 bif erlang:bump_reductions/1 diff --git a/erts/emulator/beam/erl_monitor_link.c b/erts/emulator/beam/erl_monitor_link.c index 59506b10510c..2d53dead491e 100644 --- a/erts/emulator/beam/erl_monitor_link.c +++ b/erts/emulator/beam/erl_monitor_link.c @@ -1048,6 +1048,7 @@ erts_monitor_create(Uint16 type, Eterm ref, Eterm orgn, Eterm trgt, Eterm name, msp->next = NULL; erts_atomic_init_relb(&msp->state, 0); + msp->ptimer_count = 0; erts_atomic32_init_nob(&mdp->refc, 2); break; } diff --git a/erts/emulator/beam/erl_monitor_link.h b/erts/emulator/beam/erl_monitor_link.h index d099800c9780..574da11e2b17 100644 --- a/erts/emulator/beam/erl_monitor_link.h +++ b/erts/emulator/beam/erl_monitor_link.h @@ -733,6 +733,7 @@ struct ErtsMonitorSuspend__ { ErtsMonitorData md; /* origin = suspender; target = suspendee */ ErtsMonitorSuspend *next; erts_atomic_t state; + int ptimer_count; }; #define ERTS_MSUSPEND_STATE_FLG_ACTIVE ((erts_aint_t) (((Uint) 1) << (sizeof(Uint)*8 - 1))) #define ERTS_MSUSPEND_STATE_COUNTER_MASK (~ERTS_MSUSPEND_STATE_FLG_ACTIVE) diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index ea0fa38848bd..3f19a1558e71 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -4992,6 +4992,19 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing, return ((int) reds)*4 + 8; } +static void +activate_suspend_monitor(Process *c_p, ErtsMonitorSuspend *msp) +{ + erts_aint_t mstate; + + ASSERT(msp->ptimer_count == 0); + + mstate = erts_atomic_read_bor_acqb(&msp->state, + ERTS_MSUSPEND_STATE_FLG_ACTIVE); + ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate; + erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); +} + static void handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp) { @@ -5000,14 +5013,8 @@ handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp) ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND); if (!(state & ERTS_PSFLG_DIRTY_RUNNING)) { - ErtsMonitorSuspend *msp; - erts_aint_t mstate; - - msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon); - mstate = erts_atomic_read_bor_acqb(&msp->state, - ERTS_MSUSPEND_STATE_FLG_ACTIVE); - ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate; - erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); + ErtsMonitorSuspend *msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon); + activate_suspend_monitor(c_p, msp); *yieldp = !0; } else { @@ -5213,12 +5220,7 @@ erts_proc_sig_handle_pending_suspend(Process *c_p) msp->next = NULL; if (!(state & ERTS_PSFLG_EXITING) && erts_monitor_is_in_table(&msp->md.u.target)) { - erts_aint_t mstate; - - mstate = erts_atomic_read_bor_acqb(&msp->state, - ERTS_MSUSPEND_STATE_FLG_ACTIVE); - ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate; - erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); + activate_suspend_monitor(c_p, msp); } erts_monitor_release(&msp->md.u.target); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 0798b8ea650b..754630fd9de4 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -8896,6 +8896,11 @@ erts_start_schedulers(void) } } +static Eterm +sched_pause_proc_timer(Process *c_p, void *vst, int *redsp, ErlHeapFragment **bp); +static Eterm +sched_resume_paused_proc_timer(Process *c_p, void *vst, int *redsp, ErlHeapFragment **bp); + BIF_RETTYPE erts_internal_suspend_process_2(BIF_ALIST_2) { @@ -8906,6 +8911,7 @@ erts_internal_suspend_process_2(BIF_ALIST_2) int sync = 0; int async = 0; int unless_suspending = 0; + int pause_proc_timer = 0; erts_aint_t mstate; ErtsMonitorSuspend *msp; ErtsMonitorData *mdp; @@ -8930,6 +8936,9 @@ erts_internal_suspend_process_2(BIF_ALIST_2) case am_asynchronous: async = 1; break; + case am_pause_proc_timer: + pause_proc_timer = 1; + break; default: { if (is_tuple_arity(arg, 2)) { Eterm *tp = tuple_val(arg); @@ -9029,15 +9038,35 @@ erts_internal_suspend_process_2(BIF_ALIST_2) sync = !async; } else { - noproc: - erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), &mdp->origin); - erts_monitor_release_both(mdp); - if (!async) - res = am_badarg; + goto noproc; + } + } + } + + if (pause_proc_timer) { + int proc_timer_already_paused = msp->ptimer_count++; + + if (!proc_timer_already_paused) { + Eterm res; + res = erts_proc_sig_send_rpc_request(BIF_P, + BIF_ARG_1, + 0, /* no reply */ + sched_pause_proc_timer, + NULL); + if (is_non_value(res)) { + goto noproc; } } } + while(0) { + noproc: + erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), &mdp->origin); + erts_monitor_release_both(mdp); + if (!async) + res = am_badarg; + } + if (sync) { ASSERT(is_non_value(reply_tag)); reply_res = res; @@ -9052,22 +9081,43 @@ erts_internal_suspend_process_2(BIF_ALIST_2) } /* - * The erlang:resume_process/1 BIF + * The erlang:resume_process/2 BIF */ BIF_RETTYPE -resume_process_1(BIF_ALIST_1) +resume_process_2(BIF_ALIST_2) { ErtsMonitor *mon; ErtsMonitorSuspend *msp; erts_aint_t mstate; - + int prev_suspend_count; + int resume_proc_timer = 0; + if (BIF_P->common.id == BIF_ARG_1) BIF_ERROR(BIF_P, BADARG); if (!is_internal_pid(BIF_ARG_1)) BIF_ERROR(BIF_P, BADARG); + if (is_not_nil(BIF_ARG_2)) { + /* Parse option list */ + Eterm arg = BIF_ARG_2; + while (is_list(arg)) { + Eterm *lp = list_val(arg); + arg = CAR(lp); + switch (arg) { + case am_resume_proc_timer: + resume_proc_timer = 1; + break; + default: + BIF_ERROR(BIF_P, BADARG); + } + arg = CDR(lp); + } + if (is_not_nil(arg)) + BIF_ERROR(BIF_P, BADARG); + } + mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(BIF_P), BIF_ARG_1); if (!mon) { @@ -9078,18 +9128,56 @@ resume_process_1(BIF_ALIST_1) ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND); msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon); + if (resume_proc_timer && msp->ptimer_count == 0) { + BIF_ERROR(BIF_P, BADARG); + } + mstate = erts_atomic_dec_read_relb(&msp->state); + prev_suspend_count = mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK; + + ASSERT(prev_suspend_count >= 0); - ASSERT((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) >= 0); + if (msp->ptimer_count == prev_suspend_count + 1 && !resume_proc_timer) { + erts_atomic_inc_nob(&msp->state); + BIF_ERROR(BIF_P, BADARG); + } - if ((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) == 0) { + if (prev_suspend_count == 0) { erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), mon); erts_proc_sig_send_demonitor(&BIF_P->common, BIF_P->common.id, 0, mon); } + if (resume_proc_timer) { + int needs_to_resume_timer = --msp->ptimer_count == 0; + if (needs_to_resume_timer) { + erts_proc_sig_send_rpc_request(BIF_P, + BIF_ARG_1, + 0, /* no reply */ + sched_resume_paused_proc_timer, + NULL); + } + } + BIF_RET(am_true); } +static Eterm +sched_pause_proc_timer(Process *c_p, void *vst, int *redsp, ErlHeapFragment **bp) +{ + erts_pause_proc_timer(c_p); + *redsp = 1; + return THE_NON_VALUE; +} + +static Eterm +sched_resume_paused_proc_timer(Process *c_p, void *vst, int *redsp, ErlHeapFragment **bp) +{ + erts_resume_paused_proc_timer(c_p); + *redsp = 1; + return THE_NON_VALUE; +} + + BIF_RETTYPE erts_internal_is_process_executing_dirty_1(BIF_ALIST_1) { diff --git a/erts/emulator/test/process_SUITE.erl b/erts/emulator/test/process_SUITE.erl index 9680eb5836fd..4d884ec49a76 100644 --- a/erts/emulator/test/process_SUITE.erl +++ b/erts/emulator/test/process_SUITE.erl @@ -24,6 +24,7 @@ %% exit/1 %% exit/2 %% process_info/1,2 +%% suspend_process/2 (partially) %% register/2 (partially) -include_lib("stdlib/include/assert.hrl"). @@ -56,6 +57,10 @@ process_info_msgq_len_no_very_long_delay/1, process_info_dict_lookup/1, process_info_label/1, + suspend_process_pausing_proc_timer/1, + suspend_process_pausing_proc_timer_after_suspended/1, + resume_process_resuming_proc_timer_can_resume_timer_early/1, + suspend_process_pausing_proc_needs_balanced_resume_procs/1, bump_reductions/1, low_prio/1, binary_owner/1, yield/1, yield2/1, otp_4725/1, dist_unlink_ack_exit_leak/1, bad_register/1, garbage_collect/1, otp_6237/1, @@ -131,6 +136,7 @@ all() -> otp_6237, {group, spawn_request}, {group, process_info_bif}, + {group, suspend_process_bif}, {group, processes_bif}, {group, otp_7738}, garb_other_running, {group, system_task}, @@ -185,6 +191,11 @@ groups() -> process_info_msgq_len_no_very_long_delay, process_info_dict_lookup, process_info_label]}, + {suspend_process_bif, [], + [suspend_process_pausing_proc_timer, + suspend_process_pausing_proc_timer_after_suspended, + resume_process_resuming_proc_timer_can_resume_timer_early, + suspend_process_pausing_proc_needs_balanced_resume_procs]}, {otp_7738, [], [otp_7738_waiting, otp_7738_suspended, otp_7738_resume]}, @@ -1775,6 +1786,144 @@ proc_dict_helper() -> end, proc_dict_helper(). +suspend_process_pausing_proc_timer(_Config) -> + BeforeSuspend = fun(_Pid) -> ok end, + AfterResume = fun(_Pid) -> ok end, + suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume), + ok. + +suspend_process_pausing_proc_timer_after_suspended(_Config) -> + % We suspend the process once before using pause_proc_timer + BeforeSuspend = fun(Pid) -> true = erlang:suspend_process(Pid) end, + AfterResume = fun(Pid) -> true = erlang:resume_process(Pid) end, + suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume), + ok. + +suspend_process_pausing_proc_timer_aux(BeforeSuspend, AfterResume) -> + TcProc = self(), + Pid = erlang:spawn_link( + fun() -> + TcProc ! {sync, self()}, + receive go -> ok + after 2_000 -> exit(timer_not_paused) + end, + TcProc ! {sync, self()}, + receive _ -> error(unexpected) + after 2_000 -> ok + end, + TcProc ! {sync, self()} + end + ), + + WaitForSync = fun () -> + receive {sync, Pid} -> ok + after 10_000 -> error(timeout) + end + end, + EnsureWaiting = fun() -> + wait_until(fun () -> process_info(Pid, status) == {status, waiting} end) + end, + + WaitForSync(), + EnsureWaiting(), + + BeforeSuspend(Pid), + true = erlang:suspend_process(Pid, [pause_proc_timer]), + timer:sleep(5_000), + true = erlang:resume_process(Pid, [resume_proc_timer]), + AfterResume(Pid), + timer:sleep(1_000), + Pid ! go, + + WaitForSync(), + EnsureWaiting(), + + BeforeSuspend(Pid), + true = erlang:suspend_process(Pid, [pause_proc_timer]), + true = erlang:resume_process(Pid, [resume_proc_timer]), + AfterResume(Pid), + WaitForSync(), + ok. + +resume_process_resuming_proc_timer_can_resume_timer_early(_Config) -> + TcProc = self(), + Pid = erlang:spawn_link( + fun() -> + TcProc ! {sync, self()}, + receive go -> error(received_go) + after 2_000 -> TcProc ! {sync, self()} + end + end + ), + + WaitForSync = fun () -> + receive {sync, Pid} -> ok + after 10_000 -> error(timeout) + end + end, + EnsureWaiting = fun() -> + wait_until(fun () -> process_info(Pid, status) == {status, waiting} end) + end, + + + WaitForSync(), + EnsureWaiting(), + + % Suspend twice, but pause the proc timer only once + true = erlang:suspend_process(Pid), + true = erlang:suspend_process(Pid, [pause_proc_timer]), + + % Pid is suspended so will not process it just yet + Pid ! go, + + % At this point the process is still suspended but the timer is running again + true = erlang:resume_process(Pid, [resume_proc_timer]), + ?assertEqual({status, suspended}, process_info(Pid, status)), + + % The timer must have expired by now + timer:sleep(5_000), + + true = erlang:resume_process(Pid), + WaitForSync(), + + ok. + +suspend_process_pausing_proc_needs_balanced_resume_procs(_Config) -> + Pid = erlang:spawn_link(timer, sleep, [infinity]), + + true = erlang:suspend_process(Pid), + ?assertEqual({status, suspended}, process_info(Pid, status)), + + % No pause_proc_timer so far, so fail + ?assertMatch({'EXIT', {badarg, _}}, + catch erlang:resume_process(Pid, [resume_proc_timer])), + ?assertEqual({status, suspended}, process_info(Pid, status)), + + + true = erlang:suspend_process(Pid), + true = erlang:suspend_process(Pid, [pause_proc_timer]), + true = erlang:suspend_process(Pid, [pause_proc_timer]), + + % It is ok to do out-of-order resumes; here one that doesn't resume the timer + true = erlang:resume_process(Pid), + ?assertEqual({status, suspended}, process_info(Pid, status)), + + % Do more resumes, in any order + true = erlang:resume_process(Pid, [resume_proc_timer]), + true = erlang:resume_process(Pid), + ?assertEqual({status, suspended}, process_info(Pid, status)), + + % Only one suspend remains, and it used pause_proc_timer, so fail if not resuming timer + ?assertMatch({'EXIT', {badarg, _}}, + catch erlang:resume_process(Pid)), + ?assertEqual({status, suspended}, process_info(Pid, status)), + + % Final resume, now running + true = erlang:resume_process(Pid, [resume_proc_timer]), + ?assertEqual({status, running}, process_info(Pid, status)), + + ok. + %% Tests erlang:bump_reductions/1. bump_reductions(Config) when is_list(Config) -> erlang:garbage_collect(), @@ -3139,7 +3288,7 @@ spawn_huge_arglist_test(Local, Node, ArgList) -> {'DOWN', R2, process, Pid2, Reason2} -> ArgList = Reason2 end, - + {Pid3, R3} = case Local of true -> spawn_opt(?MODULE, huge_arglist_child, ArgList, [monitor]); diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam index aed9bed68383..7f3e5241be2f 100644 Binary files a/erts/preloaded/ebin/erlang.beam and b/erts/preloaded/ebin/erlang.beam differ diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam index 928de46229df..26e3dffa64b6 100644 Binary files a/erts/preloaded/ebin/erts_internal.beam and b/erts/preloaded/ebin/erts_internal.beam differ diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 3c950c0a2f53..c6d850833b02 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -467,7 +467,7 @@ A list of binaries. This datatype is useful to use together with -export([process_flag/3, process_info/1, processes/0, purge_module/1]). -export([put/2, raise/3, read_timer/1, read_timer/2, ref_to_list/1, register/2]). -export([send_after/3, send_after/4, start_timer/3, start_timer/4]). --export([registered/0, resume_process/1, round/1, self/0]). +-export([registered/0, resume_process/1, resume_process/2, round/1, self/0]). -export([seq_trace/2, seq_trace_print/1, seq_trace_print/2, setnode/2]). -export([setnode/3, size/1, spawn/3, spawn_link/3, split_binary/2]). -export([suspend_process/2, system_monitor/0]). @@ -5471,6 +5471,21 @@ registered() -> %% resume_process/1 -doc """ Decreases the suspend count on the process identified by `Suspendee`. +Equivalent to calling [`erlang:resume_process(Suspendee, [])`](`resume_process/2`). + +> #### Warning {: .warning } +> +> This BIF is intended for debugging only. +""". +-doc #{ group => processes }. +-spec resume_process(Suspendee) -> true when + Suspendee :: pid(). +resume_process(Suspendee) -> + resume_process(Suspendee, []). + +%% resume_process/2 +-doc """ +Decreases the suspend count on the process identified by `Suspendee`. `Suspendee` is previously to have been suspended through [`erlang:suspend_process/2`](`suspend_process/2`) or @@ -5479,6 +5494,11 @@ Decreases the suspend count on the process identified by `Suspendee`. reaches zero, `Suspendee` is resumed, that is, its state is changed from suspended into the state it had before it was suspended. +Options (`Opt`s): + +- **`resume_proc_timer`** - Decrease the paused time count. If it reaches + zero, the timer will be resumed. + > #### Warning {: .warning } > > This BIF is intended for debugging only. @@ -5491,14 +5511,25 @@ Failures: previously increased the suspend count on the process identified by `Suspendee`. +- **`badarg`** - If the `resume_proc_timer` `Opt` is given, but the paused + timer count is already 0; or if it was not given and the paused timer + counte equals the suspended count. Intuitively, the usages of the + `pause_proc_timer` option of [`suspend_process/2`] and `resume_proc_timer` + need to balance out. + - **`badarg`** - If the process identified by `Suspendee` is not alive. + +- **`badarg`** - If `OptList` is not a proper list of valid `Opt`s. """. -doc #{ group => processes }. --spec resume_process(Suspendee) -> true when - Suspendee :: pid(). -resume_process(_Suspendee) -> +-spec resume_process(Suspendee, OptList) -> true when + Suspendee :: pid(), + OptList :: [Opt], + Opt :: resume_proc_timer. +resume_process(_Suspendee, _OptList) -> erlang:nif_error(undefined). + %% round/1 %% Shadowed by erl_bif_types: erlang:round/1 -doc """ @@ -5850,6 +5881,11 @@ Options (`Opt`s): Apart from the reply message, the `{asynchronous, ReplyTag}` option behaves exactly the same as the `asynchronous` option without reply tag. +- **`pause_proc_timer`** - If `Suspendee` is waiting on a message, pause the timer + associated with the `after` clause. The paused timer count is increased, so + a corresponding call to [`resume_process/2`] will need to use the `resume_proc_timer` + option to decrease it. + - **`unless_suspending`** - The process identified by `Suspendee` is suspended unless the calling process already is suspending `Suspendee`. If `unless_suspending` is combined with option `asynchronous`, a suspend request @@ -5896,7 +5932,8 @@ Failures: -spec suspend_process(Suspendee, OptList) -> boolean() when Suspendee :: pid(), OptList :: [Opt], - Opt :: unless_suspending | asynchronous | {asynchronous, term()}. + Opt :: unless_suspending | pause_proc_timer + | asynchronous | {asynchronous, term()}. suspend_process(Suspendee, OptList) -> case case erts_internal:suspend_process(Suspendee, OptList) of Ref when erlang:is_reference(Ref) -> diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index e0aaf5ee6af8..10e6204a041c 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -837,7 +837,8 @@ gather_carrier_info(_) -> Result :: boolean() | 'badarg' | reference(), Suspendee :: pid(), OptList :: [Opt], - Opt :: unless_suspending | asynchronous | {asynchronous, term()}. + Opt :: unless_suspending | pause_proc_timer + | asynchronous | {asynchronous, term()}. suspend_process(_Suspendee, _OptList) -> erlang:nif_error(undefined).