Skip to content

Commit 6b12bb2

Browse files
pedro-psbmdellweg
andcommitted
Add timeout to immediate task execution
* Enforce immediate tasks are run are async function * Add deprecation warning that immediate tasks must be async function starting from pulpcore 3.85 * Add tests to exercise immediate and defer dispatch options Co-authored-by: Matthias Dellweg <[email protected]> Closes: #5930
1 parent e8953e3 commit 6b12bb2

File tree

8 files changed

+232
-7
lines changed

8 files changed

+232
-7
lines changed

CHANGES/plugin_api/5930.bugfix

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
Added timeout to tasks dispatched as immediate for both deferred and non-deferred runs.
2+
3+
Also, immediate tasks must be coroutines from now on.
4+
This is to enable immediate tasks to run on the workers foreground without completely blocking heartbeats.
5+
Support for legacy non-coroutines immediate task will be dropped in pulpcore 3.85.

pulpcore/app/tasks/base.py

+28
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from pulpcore.app.models import CreatedResource
55
from pulpcore.plugin.models import MasterModel
66

7+
from asgiref.sync import sync_to_async
8+
79

810
def general_create_from_temp_file(app_label, serializer_name, temp_file_pk, *args, **kwargs):
911
"""
@@ -111,3 +113,29 @@ def general_multi_delete(instance_ids):
111113
with transaction.atomic():
112114
for instance in instances:
113115
instance.delete()
116+
117+
118+
async def ageneral_update(instance_id, app_label, serializer_name, *args, **kwargs):
119+
"""
120+
Async version of [pulpcore.app.tasks.base.general_update][].
121+
"""
122+
data = kwargs.pop("data", None)
123+
partial = kwargs.pop("partial", False)
124+
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
125+
instance = await serializer_class.Meta.model.objects.aget(pk=instance_id)
126+
if isinstance(instance, MasterModel):
127+
instance = await instance.acast()
128+
serializer = serializer_class(instance, data=data, partial=partial)
129+
await sync_to_async(serializer.is_valid)(raise_exception=True)
130+
await sync_to_async(serializer.save)()
131+
132+
133+
async def ageneral_delete(instance_id, app_label, serializer_name):
134+
"""
135+
Async version of [pulpcore.app.tasks.base.general_delete][].
136+
"""
137+
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
138+
instance = await serializer_class.Meta.model.objects.aget(pk=instance_id)
139+
if isinstance(instance, MasterModel):
140+
instance = await instance.acast()
141+
await instance.adelete()

pulpcore/app/tasks/test.py

+7
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,13 @@ def sleep(interval):
1414
sleep(interval)
1515

1616

17+
async def asleep(interval):
18+
"""Async function that sleeps."""
19+
import asyncio
20+
21+
await asyncio.sleep(interval)
22+
23+
1724
@backoff.on_exception(backoff.expo, BaseException)
1825
def gooey_task(interval):
1926
"""A sleep task that tries to avoid being killed by ignoring all exceptions."""

pulpcore/app/viewsets/base.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ def update(self, request, pk, **kwargs):
492492
serializer.is_valid(raise_exception=True)
493493
app_label = instance._meta.app_label
494494
task = dispatch(
495-
tasks.base.general_update,
495+
tasks.base.ageneral_update,
496496
exclusive_resources=self.async_reserved_resources(instance),
497497
args=(pk, app_label, serializer.__class__.__name__),
498498
kwargs={"data": request.data, "partial": partial},
@@ -528,7 +528,7 @@ def destroy(self, request, pk, **kwargs):
528528
serializer = self.get_serializer(instance)
529529
app_label = instance._meta.app_label
530530
task = dispatch(
531-
tasks.base.general_delete,
531+
tasks.base.ageneral_delete,
532532
exclusive_resources=self.async_reserved_resources(instance),
533533
args=(pk, app_label, serializer.__class__.__name__),
534534
immediate=self.ALLOW_NON_BLOCKING_DELETE,

pulpcore/constants.py

+2
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@
4545
#: Tasks in an incomplete state have not finished their work yet.
4646
TASK_INCOMPLETE_STATES = (TASK_STATES.WAITING, TASK_STATES.RUNNING, TASK_STATES.CANCELING)
4747

48+
#: Timeout for immediate tasks in seconds
49+
IMMEDIATE_TIMEOUT = 5
4850

4951
SYNC_MODES = SimpleNamespace(ADDITIVE="additive", MIRROR="mirror")
5052
SYNC_CHOICES = (

pulpcore/pytest_plugin.py

+2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import socket
1010
import ssl
1111
import subprocess
12+
import sys
1213
import threading
1314
import uuid
1415

@@ -1017,6 +1018,7 @@ def _dispatch_task(*args, task_group_id=None, **kwargs):
10171018

10181019
assert process.returncode == 0
10191020
task_href = process.stdout.decode().strip()
1021+
print(process.stderr.decode(), file=sys.stderr)
10201022
return task_href
10211023

10221024
return _dispatch_task

pulpcore/tasking/tasks.py

+35-4
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import sys
88
import traceback
99
import tempfile
10+
from asgiref.sync import sync_to_async
1011
from datetime import timedelta
1112
from gettext import gettext as _
1213

@@ -16,12 +17,13 @@
1617
from django_guid import get_guid
1718
from pulpcore.app.apps import MODULE_PLUGIN_VERSIONS
1819
from pulpcore.app.models import Task, TaskGroup
19-
from pulpcore.app.util import current_task, get_domain, get_prn
20+
from pulpcore.app.util import current_task, get_domain, get_prn, deprecation_logger
2021
from pulpcore.constants import (
2122
TASK_FINAL_STATES,
2223
TASK_INCOMPLETE_STATES,
2324
TASK_STATES,
2425
TASK_DISPATCH_LOCK,
26+
IMMEDIATE_TIMEOUT,
2527
)
2628
from pulpcore.tasking.kafka import send_task_notification
2729

@@ -68,11 +70,40 @@ def _execute_task(task):
6870
func = getattr(module, function_name)
6971
args = task.enc_args or ()
7072
kwargs = task.enc_kwargs or {}
71-
result = func(*args, **kwargs)
72-
if asyncio.iscoroutine(result):
73+
immediate = task.immediate
74+
is_coroutine_fn = asyncio.iscoroutinefunction(func)
75+
76+
if not is_coroutine_fn:
77+
if immediate:
78+
deprecation_logger.warning(
79+
_(
80+
"Immediate tasks must be coroutine functions."
81+
"Support for non-coroutine immediate tasks will be dropped"
82+
"in pulpcore 3.85."
83+
)
84+
)
85+
func = sync_to_async(func)
86+
is_coroutine_fn = True
87+
else:
88+
func(*args, **kwargs)
89+
90+
if is_coroutine_fn:
7391
_logger.debug(_("Task is coroutine %s"), task.pk)
92+
coro = func(*args, **kwargs)
93+
if immediate:
94+
coro = asyncio.wait_for(coro, timeout=IMMEDIATE_TIMEOUT)
7495
loop = asyncio.get_event_loop()
75-
loop.run_until_complete(result)
96+
try:
97+
loop.run_until_complete(coro)
98+
except asyncio.TimeoutError:
99+
_logger.info(
100+
_("Immediate task %s timed out after %s seconds."), task.pk, IMMEDIATE_TIMEOUT
101+
)
102+
raise RuntimeError(
103+
_("Immediate task timed out after {timeout} seconds.").format(
104+
timeout=IMMEDIATE_TIMEOUT,
105+
)
106+
)
76107

77108
except Exception:
78109
exc_type, exc, tb = sys.exc_info()

pulpcore/tests/functional/api/test_tasking.py

+151-1
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,10 @@
1010
from uuid import uuid4
1111

1212
from pulpcore.client.pulpcore import ApiException
13+
from contextlib import contextmanager
1314

14-
from pulpcore.tests.functional.utils import download_file
15+
from pulpcore.tests.functional.utils import download_file, PulpTaskError
16+
from pulpcore.constants import IMMEDIATE_TIMEOUT
1517

1618

1719
@pytest.fixture(scope="module")
@@ -445,3 +447,151 @@ def test_cancel_task_group(pulpcore_bindings, dispatch_task_group, gen_user):
445447

446448
with gen_user(model_roles=["core.task_owner"]):
447449
pulpcore_bindings.TaskGroupsApi.task_groups_cancel(tgroup_href, {"state": "canceled"})
450+
451+
452+
LT_TIMEOUT = IMMEDIATE_TIMEOUT / 2
453+
GT_TIMEOUT = IMMEDIATE_TIMEOUT * 2
454+
455+
456+
class TestImmediateTaskWithNoResource:
457+
458+
@pytest.mark.parallel
459+
def test_succeeds_on_api_worker(self, pulpcore_bindings, dispatch_task):
460+
"""
461+
GIVEN a task with no resource requirements
462+
AND the task IS an async function
463+
WHEN dispatching a task as immediate
464+
THEN the task completes with no associated worker
465+
"""
466+
task_href = dispatch_task(
467+
"pulpcore.app.tasks.test.asleep", args=(LT_TIMEOUT,), immediate=True
468+
)
469+
task = pulpcore_bindings.TasksApi.read(task_href)
470+
assert task.state == "completed"
471+
assert task.worker is None
472+
473+
@pytest.mark.parallel
474+
def test_executes_on_api_worker_when_no_async(self, pulpcore_bindings, dispatch_task, capsys):
475+
"""
476+
GIVEN a task with no resource requirements
477+
AND the task IS NOT an async function
478+
WHEN dispatching a task as immediate
479+
THEN the task completes with no associated worker
480+
"""
481+
# TODO: on 3.85 this should throw an error
482+
task_href = dispatch_task(
483+
"pulpcore.app.tasks.test.sleep", args=(LT_TIMEOUT,), immediate=True
484+
)
485+
stderr_content = capsys.readouterr().err
486+
task = pulpcore_bindings.TasksApi.read(task_href)
487+
assert task.state == "completed"
488+
assert task.worker is None
489+
assert "Support for non-coroutine immediate tasks will be dropped" in stderr_content
490+
491+
@pytest.mark.parallel
492+
def test_timeouts_on_api_worker(self, pulpcore_bindings, dispatch_task):
493+
"""
494+
GIVEN a task with no resource requirements
495+
AND the task is an async function
496+
WHEN dispatching a task as immediate
497+
AND it takes longer than timeout
498+
THEN the task fails with a timeout error message
499+
"""
500+
task_href = dispatch_task(
501+
"pulpcore.app.tasks.test.asleep", args=(GT_TIMEOUT,), immediate=True
502+
)
503+
task = pulpcore_bindings.TasksApi.read(task_href)
504+
assert task.worker is None
505+
assert "task timed out after" in task.error["description"]
506+
507+
508+
@pytest.fixture
509+
def resource_blocker(pulpcore_bindings, dispatch_task):
510+
511+
@contextmanager
512+
def _resource_blocker(exclusive_resources: list[str], duration=20):
513+
task_href = dispatch_task(
514+
"pulpcore.app.tasks.test.sleep",
515+
args=(duration,),
516+
exclusive_resources=exclusive_resources,
517+
)
518+
yield
519+
# Trying to cancel a finished task will return a 409 code.
520+
# We can ignore if that's the case, because all we want here is to cut time down.
521+
# Otherwise it might be a real error.
522+
try:
523+
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})
524+
except ApiException as e:
525+
if e.status != 409:
526+
raise
527+
528+
return _resource_blocker
529+
530+
531+
class TestImmediateTaskWithBlockedResource:
532+
533+
@pytest.mark.parallel
534+
def test_executes_in_task_worker(
535+
self, resource_blocker, dispatch_task, monitor_task, pulpcore_bindings
536+
):
537+
"""
538+
GIVEN an async task requiring busy resources
539+
WHEN dispatching a task as immediate
540+
THEN the task completes with a worker
541+
"""
542+
COMMON_RESOURCE = str(uuid4())
543+
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
544+
task_href = dispatch_task(
545+
"pulpcore.app.tasks.test.asleep",
546+
args=(LT_TIMEOUT,),
547+
immediate=True,
548+
exclusive_resources=[COMMON_RESOURCE],
549+
)
550+
task = monitor_task(task_href)
551+
assert task.state == "completed"
552+
assert task.worker is not None
553+
554+
@pytest.mark.parallel
555+
def test_throws_when_non_deferrable(
556+
self, resource_blocker, pulpcore_bindings, dispatch_task, monitor_task
557+
):
558+
"""
559+
GIVEN an async task requiring busy resources
560+
WHEN dispatching as immediate and not deferrable
561+
THEN an error is raised
562+
"""
563+
COMMON_RESOURCE = str(uuid4())
564+
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
565+
task_href = dispatch_task(
566+
"pulpcore.app.tasks.test.asleep",
567+
args=(0,),
568+
immediate=True,
569+
deferred=False,
570+
exclusive_resources=[COMMON_RESOURCE],
571+
)
572+
task = pulpcore_bindings.TasksApi.read(task_href)
573+
assert task.state == "canceled"
574+
assert task.worker is None
575+
assert "Resources temporarily unavailable." in task.error["reason"]
576+
577+
@pytest.mark.parallel
578+
def test_times_out_on_task_worker(
579+
self, resource_blocker, pulpcore_bindings, dispatch_task, monitor_task
580+
):
581+
"""
582+
GIVEN an async task requiring busy resources
583+
WHEN dispatching a task as immediate
584+
AND it takes longer than timeout
585+
THEN an error is raised
586+
"""
587+
COMMON_RESOURCE = str(uuid4())
588+
with pytest.raises(PulpTaskError) as ctx:
589+
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
590+
task_href = dispatch_task(
591+
"pulpcore.app.tasks.test.asleep",
592+
args=(GT_TIMEOUT,),
593+
immediate=True,
594+
exclusive_resources=[COMMON_RESOURCE],
595+
)
596+
monitor_task(task_href)
597+
assert "task timed out after" in ctx.value.task.error["description"]

0 commit comments

Comments
 (0)