-
Notifications
You must be signed in to change notification settings - Fork 121
[PULP-218] Add timeout to immediate tasks #6155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pulpcore/tasking/tasks.py
Outdated
signal.signal(signal.SIGALRM, timeout_handler) | ||
signal.alarm(IMMEDIATE_TASK_TIMEOUT) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, interesting.
This is quite close to low level process handling. I wonder if this might interfere with other uses of the signal handler.
Do you think we can instead require that all immediate tasks are async functions? We could then implement the timeout on the asyncio level with much more confidence.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think we can instead require that all immediate tasks are async functions? We could then implement the timeout on the asyncio level with much more confidence.
Yes. I dont have a good idea of how much friction this imposes, but that's definitely more safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mdellweg Hey! Can you provide an early review for the api worker part?
138be99
to
08a9c14
Compare
54c86f9
to
1704eb4
Compare
1704eb4
to
88fc96d
Compare
pulpcore/tasking/tasks.py
Outdated
loop.run_until_complete(result) | ||
if immediate: | ||
try: | ||
loop.run_until_complete(asyncio.wait_for(result, timeout=IMMEDIATE_TIMEOUT)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a good first cut. But we still stop the heartbeat loop for up to 5 seconds here.
We should definitely plan on rewriting the workers in async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should definitely plan on rewriting the workers in async.
Would you say that's a blocker (for running on foreground)?
I was trying to find a way to integrate the coroutine control yielding with the select-loop, but I'm reaching dark corners.
If that's a blocker this PR could be really about the timeout (as originally intented). The commit "wip: add timeout to supervise immediate tasks" adds a timeout to the regular task process. Maybe we could stop there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not necessarily. It's a certain risk. But it's nothing we cannot revert when things go bad.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe after all it's wise to split the work into smaller chunks to reduce that risk.
Ensuring that immediate tasks don't run too long is still an incremental improvement.
6b2781e
to
1b427eb
Compare
We cannot require all immediate tasks to be async without properly deprecating it for plugins. |
00a071e
to
e834821
Compare
No idea why this fails only on s3: An exception does get caught... |
4edeef0
to
a7a94b9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if you wanted a review at this stage...
pulpcore/constants.py
Outdated
@@ -45,6 +45,8 @@ | |||
#: Tasks in an incomplete state have not finished their work yet. | |||
TASK_INCOMPLETE_STATES = (TASK_STATES.WAITING, TASK_STATES.RUNNING, TASK_STATES.CANCELING) | |||
|
|||
#: Timeout for immediate tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is that seconds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'll add that on the description
pulpcore/tasking/tasks.py
Outdated
@@ -50,7 +52,6 @@ def wakeup_worker(): | |||
|
|||
|
|||
def execute_task(task): | |||
# This extra stack is needed to isolate the current_task ContextVar |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why drop this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that was an accident
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Absolutely not. But then it doesn't seem to be telling its story.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean dropping the comment was an accident.
The comment is fine. I didnt know what a ContextVar was when I first read it, but then I read about it an it made sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohhh. Thanks for the clarification...
pulpcore/tasking/tasks.py
Outdated
if asyncio.iscoroutine(result): | ||
immediate = task.immediate | ||
is_coroutine = asyncio.iscoroutine(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this ever been true? Should we no check if func
is a coroutine even before executing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gerrod showed that this is valid here. But I can look up a more intuitive form to check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The big problem is that we run the task function first and then we check if it returned something awaitable.
(That is what coroutine functions are: They are functions returning an awaitable generator object.)
While in the old code this may be working properly (unless some task accidentally returned a generator of any sort), here we don't want to run the task function in case it is immediate and not async.
So what should happen, i think, is we should check for asyncio.iscoroutinefunction(func)
and then decide whether or how to execute it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sadly the python documentation itself uses the term "coroutine" a bit inconsistently.
Sometimes as the function declared with async, and sometimes as the awaitable object that is the result of calling the former.
pulpcore/tasking/worker.py
Outdated
@@ -388,6 +390,7 @@ def supervise_task(self, task): | |||
task.save(update_fields=["worker"]) | |||
cancel_state = None | |||
cancel_reason = None | |||
time_limit = time.monotonic() + IMMEDIATE_TIMEOUT |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we not use timezone.now
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also didn't we want to let the timeout tick in the async loop?
That will help us maybe run even more than one immediate tasks per worker eventually.
Also the timeout will be in effect even on the api worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, that's true 🤦
The worker ends up calling execute_task, so we only need it there with the async loop...
) | ||
wait_until("running", task_href) | ||
|
||
# Case 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Each case should be a separate test. If you want to share resources, you can group them in a class.
AND it takes longer than timeout | ||
THEN an error is raised | ||
""" | ||
COMMON_RESOURCE = "PPP" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd make the resource a random string to avoid any collisions.
Also why not mark this as parallel too?
# Case 3 | ||
dispatch_long_task(requires_resource=COMMON_RESOURCE) | ||
with pytest.raises(PulpTaskError) as ctx: | ||
class TestImmediateTaskWithLocking: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about TestImmediateTaskWithBlockedResource?
When removing the "test" noise, the actual (first) test will read:
"Immediate task with blocked resource executes in task worker." Exactly what we want to read here...
THEN the task completes with a worker | ||
""" | ||
COMMON_RESOURCE = "MMM" | ||
dispatch_long_task(required_resources=[COMMON_RESOURCE]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could cancel the long task after dispatching the regular one in order to speed up the tests dramatically.
|
||
# Case 2 | ||
with pytest.raises(PulpTaskError) as ctx: | ||
class TestImmediateTaskNoLocking: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TestImmediateTaskWithNoResource?
class TestImmediateTaskNoLocking: | ||
|
||
@pytest.mark.parallel | ||
def test_succeed_on_api_worker(self, pulpcore_bindings, dispatch_task, monitor_task): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really nit picky...
test_succeeds_on_api_worker
def test_immediate_task_requires_resource(pulpcore_bindings, dispatch_task, monitor_task): | ||
""" | ||
GIVEN an async task requiring busy resources | ||
task = monitor_task(task_href) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we monitor that task? Should it not return as completed already from dispatch_task? Also we should be able to catch the deprecation warning here and assert on it.
Also we don't want it to bubble into the collected deprecation warnings for the ci.
_logger.debug(_("Task is coroutine %s"), task.pk) | ||
coro = func(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's wise to call this thing coro given the confusion with that name throughout the python docs.
The only idea i can come up with is "awaitable".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage is a bit confusing but there is a clear definition.
Do you think we can try to be strict about those terms?
IMO "awaitable" is too broad, we really expect a coroutine object, which is returned by a coroutine function.
But I can change if you still think that would be confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably ok.
pulpcore/tasking/tasks.py
Outdated
coro = result | ||
|
||
coro = asyncio.wait_for(coro, timeout=IMMEDIATE_TIMEOUT) | ||
assert asyncio.iscoroutine(coro), coro |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra safety? (I'm almost certain we can be sure of that now...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert asyncio.iscoroutine(coro), coro |
Let's not add tests for the interpreter to the production code.
82ff903
to
f46846b
Compare
pulpcore/pytest_plugin.py
Outdated
The stderr can be useful write asserts over log messages. | ||
This is required because the pytest caplog fixture can't capture log messages | ||
streamed from the pulpcore-manager call. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ohhhh!
Now I'm wondering if we could just use the capsys or capfd fixture instead? (If you want to try that.)
(I'd prefer not adding another fixture to do almost the same thing.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the implementation is sound at this stage...
But still some things came up when reading through.
_logger.debug(_("Task is coroutine %s"), task.pk) | ||
coro = func(*args, **kwargs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably ok.
pulpcore/tasking/tasks.py
Outdated
coro = result | ||
|
||
coro = asyncio.wait_for(coro, timeout=IMMEDIATE_TIMEOUT) | ||
assert asyncio.iscoroutine(coro), coro |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assert asyncio.iscoroutine(coro), coro |
Let's not add tests for the interpreter to the production code.
pulpcore/tasking/tasks.py
Outdated
except asyncio.TimeoutError: | ||
raise RuntimeError( | ||
_("Immediate task timed out after {} seconds").format(IMMEDIATE_TIMEOUT) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This try block really only translates the timeout error to add more information, right?
If so, please add the task uuid also, and shove something in to the log stream of the worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was supposed to add more info.
But it'll be handled by the outer try/catch-all at L107 (from changed code) and receive all the context info.
Or am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right. But i think we can still add something like:
logger.warn("Immediate task {task.pk} timed out after ...")
So we see it in the worker and api worker logs too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, ok. I've missed that comment
task_href = dispatch_task( | ||
"pulpcore.app.tasks.test.asleep", args=(LT_TIMEOUT,), immediate=True | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incidentally, these kinds of tasks are now a big driver to wish for asgi pulp instead of wsgi.
raise RuntimeError("Timeout waiting for task to transition") | ||
|
||
@contextmanager | ||
def _dispatch_long_task(required_resources: list[str], duration=5): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us stick to the established names exclusive_resources
and shared_resources
here.
args=(duration,), | ||
exclusive_resources=required_resources, | ||
) | ||
wait_until("running", task_href) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a reason to wait for anything else than an unfinished task in the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My reasoning was that for the long task to block the immediate, it must have been picked up first.
So it could be running or cancelling, but not waiting (as we have priority on immediate tasks, it would be picked up first). So, is ["running", "canceling"]
ok?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's not true. A task blocks a resource even in waiting state.
|
||
|
||
@pytest.fixture | ||
def dispatch_long_task(pulpcore_bindings, dispatch_task): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this fixture sounds kike it should be called resource_blocker
.
That we use a task for it, is merely an implementation detail.
task = pulpcore_bindings.TasksApi.read(task_href) | ||
if task.state == "running": | ||
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
task = pulpcore_bindings.TasksApi.read(task_href) | |
if task.state == "running": | |
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"}) | |
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"}) |
No need to be polite to that task. Just make sure it goes away.
Maybe we even require it to use a non existing plugin so it will never be picked up at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the task has already finished this call will raise a 409 error on the pulpcore client.
E.g:
task_href = dispatch_task("pulpcore.app.tasks.test.sleep", args=(2,))
time.sleep(3)
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"}) # <- raises
But l can just suppress that error instead, didn't think about that before.
assert task.worker is not None | ||
|
||
@pytest.mark.parallel | ||
def test_throws_when_non_deferrable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a newly defined behaviour. Can you do me a favor and see whether we already have a test for this?
(Ideally we combine them or pick the nicer one...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't test it elsewhere:
> git grep -I -n defer -- pulpcore/tests/
pulpcore/tests/functional/api/test_tasking.py:564: def test_throws_when_non_deferrable(
pulpcore/tests/functional/api/test_tasking.py:569: WHEN dispatching as immediate and not deferrable
pulpcore/tests/functional/api/test_tasking.py:578: deferred=False,
assert "Resources temporarily unavailable." in task.error["reason"] | ||
|
||
@pytest.mark.parallel | ||
def test_timeouts_on_task_worker( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick mode: test_times_out_on_task_worker
reads more English to me.
6658c42
to
b2fca1e
Compare
assert "Support for non-coroutine immediate tasks will be dropped" in stderr | ||
assert "Support for non-coroutine immediate tasks will be dropped" in stderr_content |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you! Not adding a new fixture for this makes me happier.
args=(duration,), | ||
exclusive_resources=required_resources, | ||
) | ||
wait_until("running", task_href) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's not true. A task blocks a resource even in waiting state.
args=(duration,), | ||
exclusive_resources=exclusive_resources, | ||
) | ||
wait_until(TASK_INCOMPLETE_STATES, task_href) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait_until(TASK_INCOMPLETE_STATES, task_href) |
What do you want to wait for here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, the wait doesnt make sense anymore. Removing...
But I wanted to share some thoughts I had on this (not wanting to change anything in this PR):
The long task will always be unblocked first because it has lower pulp_created
, and the short is assured to be blocked as we intend in the test, right?
I was thinking if we shouldnt sort by immediate task there too (in the unblock_task
). This is a violation of the event order, but so is our immediate task prioritization in iter_tasks
. I can see there is a diffrence, and feels like it would be worse to violate that in unblocking-time than it is in picking-time, but that's not crystal clear to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well the thing is the immediate task will not be unblocked before the other task is in a final state. And that's the only thing needed to assure resource usage is serialized. It does not matter whether the prior task is identified unblocked or in a waiting, running or canceling state.
d9b1386
to
6ab7dac
Compare
6ab7dac
to
52da68c
Compare
799969a
to
d3e0269
Compare
f3b0a48
to
ad769c9
Compare
* Enforce immediate tasks are run are async function * Add deprecation warning that immediate tasks must be async function starting from pulpcore 3.85 * Add tests to exercise immediate and defer dispatch options Co-authored-by: Matthias Dellweg <[email protected]> Closes: pulp#5930
ad769c9
to
dbae78a
Compare
No description provided.