Skip to content

[PULP-218] Add timeout to immediate tasks #6155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES/plugin_api/5930.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Added timeout to tasks dispatched as immediate for both deferred and non-deferred runs.

Also, immediate tasks must be coroutines from now on.
This is to enable immediate tasks to run on the workers foreground without completely blocking heartbeats.
Support for legacy non-coroutines immediate task will be dropped in pulpcore 3.85.
28 changes: 28 additions & 0 deletions pulpcore/app/tasks/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from pulpcore.app.models import CreatedResource
from pulpcore.plugin.models import MasterModel

from asgiref.sync import sync_to_async


def general_create_from_temp_file(app_label, serializer_name, temp_file_pk, *args, **kwargs):
"""
Expand Down Expand Up @@ -111,3 +113,29 @@ def general_multi_delete(instance_ids):
with transaction.atomic():
for instance in instances:
instance.delete()


async def ageneral_update(instance_id, app_label, serializer_name, *args, **kwargs):
"""
Async version of [pulpcore.app.tasks.base.general_update][].
"""
data = kwargs.pop("data", None)
partial = kwargs.pop("partial", False)
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
instance = await serializer_class.Meta.model.objects.aget(pk=instance_id)
if isinstance(instance, MasterModel):
instance = await instance.acast()
serializer = serializer_class(instance, data=data, partial=partial)
await sync_to_async(serializer.is_valid)(raise_exception=True)
await sync_to_async(serializer.save)()


async def ageneral_delete(instance_id, app_label, serializer_name):
"""
Async version of [pulpcore.app.tasks.base.general_delete][].
"""
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
instance = await serializer_class.Meta.model.objects.aget(pk=instance_id)
if isinstance(instance, MasterModel):
instance = await instance.acast()
await instance.adelete()
7 changes: 7 additions & 0 deletions pulpcore/app/tasks/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,13 @@ def sleep(interval):
sleep(interval)


async def asleep(interval):
"""Async function that sleeps."""
import asyncio

await asyncio.sleep(interval)


@backoff.on_exception(backoff.expo, BaseException)
def gooey_task(interval):
"""A sleep task that tries to avoid being killed by ignoring all exceptions."""
Expand Down
4 changes: 2 additions & 2 deletions pulpcore/app/viewsets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ def update(self, request, pk, **kwargs):
serializer.is_valid(raise_exception=True)
app_label = instance._meta.app_label
task = dispatch(
tasks.base.general_update,
tasks.base.ageneral_update,
exclusive_resources=self.async_reserved_resources(instance),
args=(pk, app_label, serializer.__class__.__name__),
kwargs={"data": request.data, "partial": partial},
Expand Down Expand Up @@ -528,7 +528,7 @@ def destroy(self, request, pk, **kwargs):
serializer = self.get_serializer(instance)
app_label = instance._meta.app_label
task = dispatch(
tasks.base.general_delete,
tasks.base.ageneral_delete,
exclusive_resources=self.async_reserved_resources(instance),
args=(pk, app_label, serializer.__class__.__name__),
immediate=self.ALLOW_NON_BLOCKING_DELETE,
Expand Down
2 changes: 2 additions & 0 deletions pulpcore/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
#: Tasks in an incomplete state have not finished their work yet.
TASK_INCOMPLETE_STATES = (TASK_STATES.WAITING, TASK_STATES.RUNNING, TASK_STATES.CANCELING)

#: Timeout for immediate tasks in seconds
IMMEDIATE_TIMEOUT = 5

SYNC_MODES = SimpleNamespace(ADDITIVE="additive", MIRROR="mirror")
SYNC_CHOICES = (
Expand Down
2 changes: 2 additions & 0 deletions pulpcore/pytest_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import socket
import ssl
import subprocess
import sys
import threading
import uuid

Expand Down Expand Up @@ -1017,6 +1018,7 @@ def _dispatch_task(*args, task_group_id=None, **kwargs):

assert process.returncode == 0
task_href = process.stdout.decode().strip()
print(process.stderr.decode(), file=sys.stderr)
return task_href

return _dispatch_task
Expand Down
39 changes: 35 additions & 4 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import traceback
import tempfile
from asgiref.sync import sync_to_async
from datetime import timedelta
from gettext import gettext as _

Expand All @@ -16,12 +17,13 @@
from django_guid import get_guid
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
from pulpcore.app.models import Task, TaskGroup
from pulpcore.app.util import current_task, get_domain, get_prn
from pulpcore.app.util import current_task, get_domain, get_prn, deprecation_logger
from pulpcore.constants import (
TASK_FINAL_STATES,
TASK_INCOMPLETE_STATES,
TASK_STATES,
TASK_DISPATCH_LOCK,
IMMEDIATE_TIMEOUT,
)
from pulpcore.tasking.kafka import send_task_notification

Expand Down Expand Up @@ -68,11 +70,40 @@ def _execute_task(task):
func = getattr(module, function_name)
args = task.enc_args or ()
kwargs = task.enc_kwargs or {}
result = func(*args, **kwargs)
if asyncio.iscoroutine(result):
immediate = task.immediate
is_coroutine_fn = asyncio.iscoroutinefunction(func)

if not is_coroutine_fn:
if immediate:
deprecation_logger.warning(
_(
"Immediate tasks must be coroutine functions."
"Support for non-coroutine immediate tasks will be dropped"
"in pulpcore 3.85."
)
)
func = sync_to_async(func)
is_coroutine_fn = True
else:
func(*args, **kwargs)

if is_coroutine_fn:
_logger.debug(_("Task is coroutine %s"), task.pk)
coro = func(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's wise to call this thing coro given the confusion with that name throughout the python docs.
The only idea i can come up with is "awaitable".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage is a bit confusing but there is a clear definition.
Do you think we can try to be strict about those terms?

IMO "awaitable" is too broad, we really expect a coroutine object, which is returned by a coroutine function.
But I can change if you still think that would be confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably ok.

if immediate:
coro = asyncio.wait_for(coro, timeout=IMMEDIATE_TIMEOUT)
loop = asyncio.get_event_loop()
loop.run_until_complete(result)
try:
loop.run_until_complete(coro)
except asyncio.TimeoutError:
_logger.info(
_("Immediate task %s timed out after %s seconds."), task.pk, IMMEDIATE_TIMEOUT
)
raise RuntimeError(
_("Immediate task timed out after {timeout} seconds.").format(
timeout=IMMEDIATE_TIMEOUT,
)
)

except Exception:
exc_type, exc, tb = sys.exc_info()
Expand Down
152 changes: 151 additions & 1 deletion pulpcore/tests/functional/api/test_tasking.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
from uuid import uuid4

from pulpcore.client.pulpcore import ApiException
from contextlib import contextmanager

from pulpcore.tests.functional.utils import download_file
from pulpcore.tests.functional.utils import download_file, PulpTaskError
from pulpcore.constants import IMMEDIATE_TIMEOUT


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -445,3 +447,151 @@ def test_cancel_task_group(pulpcore_bindings, dispatch_task_group, gen_user):

with gen_user(model_roles=["core.task_owner"]):
pulpcore_bindings.TaskGroupsApi.task_groups_cancel(tgroup_href, {"state": "canceled"})


LT_TIMEOUT = IMMEDIATE_TIMEOUT / 2
GT_TIMEOUT = IMMEDIATE_TIMEOUT * 2


class TestImmediateTaskWithNoResource:

@pytest.mark.parallel
def test_succeeds_on_api_worker(self, pulpcore_bindings, dispatch_task):
"""
GIVEN a task with no resource requirements
AND the task IS an async function
WHEN dispatching a task as immediate
THEN the task completes with no associated worker
"""
task_href = dispatch_task(
"pulpcore.app.tasks.test.asleep", args=(LT_TIMEOUT,), immediate=True
)
Comment on lines +466 to +468
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incidentally, these kinds of tasks are now a big driver to wish for asgi pulp instead of wsgi.

task = pulpcore_bindings.TasksApi.read(task_href)
assert task.state == "completed"
assert task.worker is None

@pytest.mark.parallel
def test_executes_on_api_worker_when_no_async(self, pulpcore_bindings, dispatch_task, capsys):
"""
GIVEN a task with no resource requirements
AND the task IS NOT an async function
WHEN dispatching a task as immediate
THEN the task completes with no associated worker
"""
# TODO: on 3.85 this should throw an error
task_href = dispatch_task(
"pulpcore.app.tasks.test.sleep", args=(LT_TIMEOUT,), immediate=True
)
stderr_content = capsys.readouterr().err
task = pulpcore_bindings.TasksApi.read(task_href)
assert task.state == "completed"
assert task.worker is None
assert "Support for non-coroutine immediate tasks will be dropped" in stderr_content

@pytest.mark.parallel
def test_timeouts_on_api_worker(self, pulpcore_bindings, dispatch_task):
"""
GIVEN a task with no resource requirements
AND the task is an async function
WHEN dispatching a task as immediate
AND it takes longer than timeout
THEN the task fails with a timeout error message
"""
task_href = dispatch_task(
"pulpcore.app.tasks.test.asleep", args=(GT_TIMEOUT,), immediate=True
)
task = pulpcore_bindings.TasksApi.read(task_href)
assert task.worker is None
assert "task timed out after" in task.error["description"]


@pytest.fixture
def resource_blocker(pulpcore_bindings, dispatch_task):

@contextmanager
def _resource_blocker(exclusive_resources: list[str], duration=20):
task_href = dispatch_task(
"pulpcore.app.tasks.test.sleep",
args=(duration,),
exclusive_resources=exclusive_resources,
)
yield
# Trying to cancel a finished task will return a 409 code.
# We can ignore if that's the case, because all we want here is to cut time down.
# Otherwise it might be a real error.
try:
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})
except ApiException as e:
if e.status != 409:
raise

return _resource_blocker


class TestImmediateTaskWithBlockedResource:

@pytest.mark.parallel
def test_executes_in_task_worker(
self, resource_blocker, dispatch_task, monitor_task, pulpcore_bindings
):
"""
GIVEN an async task requiring busy resources
WHEN dispatching a task as immediate
THEN the task completes with a worker
"""
COMMON_RESOURCE = str(uuid4())
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
task_href = dispatch_task(
"pulpcore.app.tasks.test.asleep",
args=(LT_TIMEOUT,),
immediate=True,
exclusive_resources=[COMMON_RESOURCE],
)
task = monitor_task(task_href)
assert task.state == "completed"
assert task.worker is not None

@pytest.mark.parallel
def test_throws_when_non_deferrable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a newly defined behaviour. Can you do me a favor and see whether we already have a test for this?

(Ideally we combine them or pick the nicer one...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't test it elsewhere:

> git grep -I -n defer -- pulpcore/tests/
pulpcore/tests/functional/api/test_tasking.py:564:    def test_throws_when_non_deferrable(
pulpcore/tests/functional/api/test_tasking.py:569:        WHEN dispatching as immediate and not deferrable
pulpcore/tests/functional/api/test_tasking.py:578:                deferred=False,

self, resource_blocker, pulpcore_bindings, dispatch_task, monitor_task
):
"""
GIVEN an async task requiring busy resources
WHEN dispatching as immediate and not deferrable
THEN an error is raised
"""
COMMON_RESOURCE = str(uuid4())
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
task_href = dispatch_task(
"pulpcore.app.tasks.test.asleep",
args=(0,),
immediate=True,
deferred=False,
exclusive_resources=[COMMON_RESOURCE],
)
task = pulpcore_bindings.TasksApi.read(task_href)
assert task.state == "canceled"
assert task.worker is None
assert "Resources temporarily unavailable." in task.error["reason"]

@pytest.mark.parallel
def test_times_out_on_task_worker(
self, resource_blocker, pulpcore_bindings, dispatch_task, monitor_task
):
"""
GIVEN an async task requiring busy resources
WHEN dispatching a task as immediate
AND it takes longer than timeout
THEN an error is raised
"""
COMMON_RESOURCE = str(uuid4())
with pytest.raises(PulpTaskError) as ctx:
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
task_href = dispatch_task(
"pulpcore.app.tasks.test.asleep",
args=(GT_TIMEOUT,),
immediate=True,
exclusive_resources=[COMMON_RESOURCE],
)
monitor_task(task_href)
assert "task timed out after" in ctx.value.task.error["description"]