Skip to content

Commit 6658c42

Browse files
committed
wip: review iteration
1 parent f46846b commit 6658c42

File tree

3 files changed

+42
-61
lines changed

3 files changed

+42
-61
lines changed

pulpcore/pytest_plugin.py

Lines changed: 20 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import socket
1010
import ssl
1111
import subprocess
12+
import sys
1213
import threading
1314
import uuid
1415

@@ -993,53 +994,32 @@ def _wget_recursive_download_on_host(url, destination):
993994
# Tasking related fixtures
994995

995996

996-
def _build_dispatch_cmd(pulpcore_bindings, task_group_id, args, kwargs):
997-
cid = pulpcore_bindings.client.default_headers.get("Correlation-ID") or str(uuid.uuid4())
998-
username = pulpcore_bindings.client.configuration.username
999-
return (
1000-
"from django_guid import set_guid; "
1001-
"from pulpcore.tasking.tasks import dispatch; "
1002-
"from pulpcore.app.models import TaskGroup; "
1003-
"from pulpcore.app.util import get_url, set_current_user; "
1004-
"from django.contrib.auth import get_user_model; "
1005-
"User = get_user_model(); "
1006-
f"user = User.objects.filter(username='{username}').first(); "
1007-
"set_current_user(user); "
1008-
f"set_guid({cid!r}); "
1009-
f"tg = {task_group_id!r} and TaskGroup.objects.filter(pk={task_group_id!r}).first(); "
1010-
f"task = dispatch(*{args!r}, task_group=tg, **{kwargs!r}); "
1011-
"print(get_url(task))"
1012-
)
1013-
1014-
1015997
@pytest.fixture(scope="session")
1016998
def dispatch_task(pulpcore_bindings):
1017999
def _dispatch_task(*args, task_group_id=None, **kwargs):
1018-
commands = _build_dispatch_cmd(pulpcore_bindings, task_group_id, args, kwargs)
1019-
process = subprocess.run(["pulpcore-manager", "shell", "-c", commands], capture_output=True)
1020-
assert process.returncode == 0
1021-
task_href = process.stdout.decode().strip()
1022-
return task_href
1023-
1024-
return _dispatch_task
1025-
1026-
1027-
@pytest.fixture(scope="session")
1028-
def dispatch_task_with_stderr(pulpcore_bindings):
1029-
"""Variation of dispatch_task that returns (task_href, stderr_content).
1030-
1031-
The stderr can be useful write asserts over log messages.
1032-
This is required because the pytest caplog fixture can't capture log messages
1033-
streamed from the pulpcore-manager call.
1034-
"""
1000+
cid = pulpcore_bindings.client.default_headers.get("Correlation-ID") or str(uuid.uuid4())
1001+
username = pulpcore_bindings.client.configuration.username
1002+
commands = (
1003+
"from django_guid import set_guid; "
1004+
"from pulpcore.tasking.tasks import dispatch; "
1005+
"from pulpcore.app.models import TaskGroup; "
1006+
"from pulpcore.app.util import get_url, set_current_user; "
1007+
"from django.contrib.auth import get_user_model; "
1008+
"User = get_user_model(); "
1009+
f"user = User.objects.filter(username='{username}').first(); "
1010+
"set_current_user(user); "
1011+
f"set_guid({cid!r}); "
1012+
f"tg = {task_group_id!r} and TaskGroup.objects.filter(pk={task_group_id!r}).first(); "
1013+
f"task = dispatch(*{args!r}, task_group=tg, **{kwargs!r}); "
1014+
"print(get_url(task))"
1015+
)
10351016

1036-
def _dispatch_task(*args, task_group_id=None, **kwargs):
1037-
commands = _build_dispatch_cmd(pulpcore_bindings, task_group_id, args, kwargs)
10381017
process = subprocess.run(["pulpcore-manager", "shell", "-c", commands], capture_output=True)
1018+
10391019
assert process.returncode == 0
10401020
task_href = process.stdout.decode().strip()
1041-
stderr = process.stderr.decode()
1042-
return task_href, stderr
1021+
print(process.stderr.decode(), file=sys.stderr)
1022+
return task_href
10431023

10441024
return _dispatch_task
10451025

pulpcore/tasking/tasks.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ def _execute_task(task):
9292
coro = func(*args, **kwargs)
9393
if immediate:
9494
coro = asyncio.wait_for(coro, timeout=IMMEDIATE_TIMEOUT)
95-
assert asyncio.iscoroutine(coro), coro
9695
loop = asyncio.get_event_loop()
9796
try:
9897
loop.run_until_complete(coro)

pulpcore/tests/functional/api/test_tasking.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -471,25 +471,22 @@ def test_succeeds_on_api_worker(self, pulpcore_bindings, dispatch_task):
471471
assert task.worker is None
472472

473473
@pytest.mark.parallel
474-
def test_executes_on_api_worker_when_no_async(
475-
self,
476-
pulpcore_bindings,
477-
dispatch_task_with_stderr,
478-
):
474+
def test_executes_on_api_worker_when_no_async(self, pulpcore_bindings, dispatch_task, capsys):
479475
"""
480476
GIVEN a task with no resource requirements
481477
AND the task IS NOT an async function
482478
WHEN dispatching a task as immediate
483479
THEN the task completes with no associated worker
484480
"""
485481
# TODO: on 3.85 this should throw an error
486-
task_href, stderr = dispatch_task_with_stderr(
482+
task_href = dispatch_task(
487483
"pulpcore.app.tasks.test.sleep", args=(LT_TIMEOUT,), immediate=True
488484
)
485+
stderr_content = capsys.readouterr().err
489486
task = pulpcore_bindings.TasksApi.read(task_href)
490487
assert task.state == "completed"
491488
assert task.worker is None
492-
assert "Support for non-coroutine immediate tasks will be dropped" in stderr
489+
assert "Support for non-coroutine immediate tasks will be dropped" in stderr_content
493490

494491
@pytest.mark.parallel
495492
def test_timeouts_on_api_worker(self, pulpcore_bindings, dispatch_task):
@@ -509,7 +506,7 @@ def test_timeouts_on_api_worker(self, pulpcore_bindings, dispatch_task):
509506

510507

511508
@pytest.fixture
512-
def dispatch_long_task(pulpcore_bindings, dispatch_task):
509+
def resource_blocker(pulpcore_bindings, dispatch_task):
513510

514511
def wait_until(state, task_href, timeout=10):
515512
for i in range(timeout):
@@ -520,34 +517,39 @@ def wait_until(state, task_href, timeout=10):
520517
raise RuntimeError("Timeout waiting for task to transition")
521518

522519
@contextmanager
523-
def _dispatch_long_task(required_resources: list[str], duration=5):
520+
def _resource_blocker(exclusive_resources: list[str], duration=5):
524521
task_href = dispatch_task(
525522
"pulpcore.app.tasks.test.sleep",
526523
args=(duration,),
527-
exclusive_resources=required_resources,
524+
exclusive_resources=exclusive_resources,
528525
)
529526
wait_until("running", task_href)
530527
yield
531-
task = pulpcore_bindings.TasksApi.read(task_href)
532-
if task.state == "running":
528+
# Trying to cancel a finished task will return a 409 code.
529+
# We can ignore if that's the case, because all we want here is to cut time down.
530+
# Otherwise it might be a real error.
531+
try:
533532
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})
533+
except ApiException as e:
534+
if e.status != 409:
535+
raise
534536

535-
return _dispatch_long_task
537+
return _resource_blocker
536538

537539

538540
class TestImmediateTaskWithBlockedResource:
539541

540542
@pytest.mark.parallel
541543
def test_executes_in_task_worker(
542-
self, dispatch_long_task, dispatch_task, monitor_task, pulpcore_bindings
544+
self, resource_blocker, dispatch_task, monitor_task, pulpcore_bindings
543545
):
544546
"""
545547
GIVEN an async task requiring busy resources
546548
WHEN dispatching a task as immediate
547549
THEN the task completes with a worker
548550
"""
549551
COMMON_RESOURCE = str(uuid4())
550-
with dispatch_long_task(required_resources=[COMMON_RESOURCE]):
552+
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
551553
task_href = dispatch_task(
552554
"pulpcore.app.tasks.test.asleep",
553555
args=(LT_TIMEOUT,),
@@ -560,15 +562,15 @@ def test_executes_in_task_worker(
560562

561563
@pytest.mark.parallel
562564
def test_throws_when_non_deferrable(
563-
self, dispatch_long_task, pulpcore_bindings, dispatch_task, monitor_task
565+
self, resource_blocker, pulpcore_bindings, dispatch_task, monitor_task
564566
):
565567
"""
566568
GIVEN an async task requiring busy resources
567569
WHEN dispatching as immediate and not deferrable
568570
THEN an error is raised
569571
"""
570572
COMMON_RESOURCE = str(uuid4())
571-
with dispatch_long_task(required_resources=[COMMON_RESOURCE]):
573+
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
572574
task_href = dispatch_task(
573575
"pulpcore.app.tasks.test.asleep",
574576
args=(0,),
@@ -582,8 +584,8 @@ def test_throws_when_non_deferrable(
582584
assert "Resources temporarily unavailable." in task.error["reason"]
583585

584586
@pytest.mark.parallel
585-
def test_timeouts_on_task_worker(
586-
self, dispatch_long_task, pulpcore_bindings, dispatch_task, monitor_task
587+
def test_times_out_on_task_worker(
588+
self, resource_blocker, pulpcore_bindings, dispatch_task, monitor_task
587589
):
588590
"""
589591
GIVEN an async task requiring busy resources
@@ -593,7 +595,7 @@ def test_timeouts_on_task_worker(
593595
"""
594596
COMMON_RESOURCE = str(uuid4())
595597
with pytest.raises(PulpTaskError) as ctx:
596-
with dispatch_long_task(required_resources=[COMMON_RESOURCE]):
598+
with resource_blocker(exclusive_resources=[COMMON_RESOURCE]):
597599
task_href = dispatch_task(
598600
"pulpcore.app.tasks.test.asleep",
599601
args=(GT_TIMEOUT,),

0 commit comments

Comments
 (0)