-
Notifications
You must be signed in to change notification settings - Fork 121
[PULP-218] Add timeout to immediate tasks #6155
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
Added timeout to tasks dispatched as immediate for both deferred and non-deferred runs. | ||
|
||
Also, immediate tasks must be coroutines from now on. | ||
This is to enable immediate tasks to run on the workers foreground without completely blocking heartbeats. | ||
Support for legacy non-coroutines immediate task will be dropped in pulpcore 3.85. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,8 +10,10 @@ | |
from uuid import uuid4 | ||
|
||
from pulpcore.client.pulpcore import ApiException | ||
from contextlib import contextmanager | ||
|
||
from pulpcore.tests.functional.utils import download_file | ||
from pulpcore.tests.functional.utils import download_file, PulpTaskError | ||
from pulpcore.constants import IMMEDIATE_TIMEOUT | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
|
@@ -445,3 +447,151 @@ def test_cancel_task_group(pulpcore_bindings, dispatch_task_group, gen_user): | |
|
||
with gen_user(model_roles=["core.task_owner"]): | ||
pulpcore_bindings.TaskGroupsApi.task_groups_cancel(tgroup_href, {"state": "canceled"}) | ||
|
||
|
||
LT_TIMEOUT = IMMEDIATE_TIMEOUT / 2 | ||
GT_TIMEOUT = IMMEDIATE_TIMEOUT * 2 | ||
|
||
|
||
class TestImmediateTaskWithNoResource: | ||
|
||
@pytest.mark.parallel | ||
def test_succeeds_on_api_worker(self, pulpcore_bindings, dispatch_task): | ||
""" | ||
GIVEN a task with no resource requirements | ||
AND the task IS an async function | ||
WHEN dispatching a task as immediate | ||
THEN the task completes with no associated worker | ||
""" | ||
task_href = dispatch_task( | ||
"pulpcore.app.tasks.test.asleep", args=(LT_TIMEOUT,), immediate=True | ||
) | ||
Comment on lines
+466
to
+468
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Incidentally, these kinds of tasks are now a big driver to wish for asgi pulp instead of wsgi. |
||
task = pulpcore_bindings.TasksApi.read(task_href) | ||
assert task.state == "completed" | ||
assert task.worker is None | ||
|
||
@pytest.mark.parallel | ||
def test_executes_on_api_worker_when_no_async(self, pulpcore_bindings, dispatch_task, capsys): | ||
""" | ||
GIVEN a task with no resource requirements | ||
AND the task IS NOT an async function | ||
WHEN dispatching a task as immediate | ||
THEN the task completes with no associated worker | ||
""" | ||
# TODO: on 3.85 this should throw an error | ||
task_href = dispatch_task( | ||
"pulpcore.app.tasks.test.sleep", args=(LT_TIMEOUT,), immediate=True | ||
) | ||
stderr_content = capsys.readouterr().err | ||
task = pulpcore_bindings.TasksApi.read(task_href) | ||
assert task.state == "completed" | ||
assert task.worker is None | ||
assert "Support for non-coroutine immediate tasks will be dropped" in stderr_content | ||
|
||
@pytest.mark.parallel | ||
def test_timeouts_on_api_worker(self, pulpcore_bindings, dispatch_task): | ||
""" | ||
GIVEN a task with no resource requirements | ||
AND the task is an async function | ||
WHEN dispatching a task as immediate | ||
AND it takes longer than timeout | ||
THEN the task fails with a timeout error message | ||
""" | ||
task_href = dispatch_task( | ||
"pulpcore.app.tasks.test.asleep", args=(GT_TIMEOUT,), immediate=True | ||
) | ||
task = pulpcore_bindings.TasksApi.read(task_href) | ||
assert task.worker is None | ||
assert "task timed out after" in task.error["description"] | ||
|
||
|
||
@pytest.fixture | ||
def resource_blocker(pulpcore_bindings, dispatch_task): | ||
|
||
@contextmanager | ||
def _resource_blocker(exclusive_resources: list[str], duration=20): | ||
task_href = dispatch_task( | ||
"pulpcore.app.tasks.test.sleep", | ||
args=(duration,), | ||
exclusive_resources=exclusive_resources, | ||
) | ||
yield | ||
# Trying to cancel a finished task will return a 409 code. | ||
# We can ignore if that's the case, because all we want here is to cut time down. | ||
# Otherwise it might be a real error. | ||
try: | ||
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"}) | ||
except ApiException as e: | ||
if e.status != 409: | ||
raise | ||
|
||
return _resource_blocker | ||
|
||
|
||
class TestImmediateTaskWithBlockedResource: | ||
|
||
@pytest.mark.parallel | ||
def test_executes_in_task_worker( | ||
self, resource_blocker, dispatch_task, monitor_task, pulpcore_bindings | ||
): | ||
""" | ||
GIVEN an async task requiring busy resources | ||
WHEN dispatching a task as immediate | ||
THEN the task completes with a worker | ||
""" | ||
COMMON_RESOURCE = str(uuid4()) | ||
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]): | ||
task_href = dispatch_task( | ||
"pulpcore.app.tasks.test.asleep", | ||
args=(LT_TIMEOUT,), | ||
immediate=True, | ||
exclusive_resources=[COMMON_RESOURCE], | ||
) | ||
task = monitor_task(task_href) | ||
assert task.state == "completed" | ||
assert task.worker is not None | ||
|
||
@pytest.mark.parallel | ||
def test_throws_when_non_deferrable( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not a newly defined behaviour. Can you do me a favor and see whether we already have a test for this? (Ideally we combine them or pick the nicer one...) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks like we don't test it elsewhere:
|
||
self, resource_blocker, pulpcore_bindings, dispatch_task, monitor_task | ||
): | ||
""" | ||
GIVEN an async task requiring busy resources | ||
WHEN dispatching as immediate and not deferrable | ||
THEN an error is raised | ||
""" | ||
COMMON_RESOURCE = str(uuid4()) | ||
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]): | ||
task_href = dispatch_task( | ||
"pulpcore.app.tasks.test.asleep", | ||
args=(0,), | ||
immediate=True, | ||
deferred=False, | ||
exclusive_resources=[COMMON_RESOURCE], | ||
) | ||
task = pulpcore_bindings.TasksApi.read(task_href) | ||
assert task.state == "canceled" | ||
assert task.worker is None | ||
assert "Resources temporarily unavailable." in task.error["reason"] | ||
|
||
@pytest.mark.parallel | ||
def test_times_out_on_task_worker( | ||
self, resource_blocker, pulpcore_bindings, dispatch_task, monitor_task | ||
): | ||
""" | ||
GIVEN an async task requiring busy resources | ||
WHEN dispatching a task as immediate | ||
AND it takes longer than timeout | ||
THEN an error is raised | ||
""" | ||
COMMON_RESOURCE = str(uuid4()) | ||
with pytest.raises(PulpTaskError) as ctx: | ||
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]): | ||
task_href = dispatch_task( | ||
"pulpcore.app.tasks.test.asleep", | ||
args=(GT_TIMEOUT,), | ||
immediate=True, | ||
exclusive_resources=[COMMON_RESOURCE], | ||
) | ||
monitor_task(task_href) | ||
assert "task timed out after" in ctx.value.task.error["description"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure if it's wise to call this thing coro given the confusion with that name throughout the python docs.
The only idea i can come up with is "awaitable".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage is a bit confusing but there is a clear definition.
Do you think we can try to be strict about those terms?
IMO "awaitable" is too broad, we really expect a coroutine object, which is returned by a coroutine function.
But I can change if you still think that would be confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably ok.