Skip to content

[PULP-218] Add timeout to immediate tasks #6155

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2025

Conversation

pedro-psb
Copy link
Member

No description provided.

@pedro-psb pedro-psb changed the title Add timeout to immediate tasks [PULP-218] Add timeout to immediate tasks Dec 16, 2024
Comment on lines 62 to 63
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(IMMEDIATE_TASK_TIMEOUT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, interesting.
This is quite close to low level process handling. I wonder if this might interfere with other uses of the signal handler.

Do you think we can instead require that all immediate tasks are async functions? We could then implement the timeout on the asyncio level with much more confidence.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we can instead require that all immediate tasks are async functions? We could then implement the timeout on the asyncio level with much more confidence.

Yes. I dont have a good idea of how much friction this imposes, but that's definitely more safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdellweg Hey! Can you provide an early review for the api worker part?

@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch 2 times, most recently from 138be99 to 08a9c14 Compare February 11, 2025 22:10
@pedro-psb pedro-psb linked an issue Feb 12, 2025 that may be closed by this pull request
@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch 4 times, most recently from 54c86f9 to 1704eb4 Compare February 12, 2025 22:08
@pedro-psb pedro-psb marked this pull request as ready for review February 12, 2025 22:18
@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch from 1704eb4 to 88fc96d Compare February 12, 2025 22:32
loop.run_until_complete(result)
if immediate:
try:
loop.run_until_complete(asyncio.wait_for(result, timeout=IMMEDIATE_TIMEOUT))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good first cut. But we still stop the heartbeat loop for up to 5 seconds here.

We should definitely plan on rewriting the workers in async.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely plan on rewriting the workers in async.

Would you say that's a blocker (for running on foreground)?

I was trying to find a way to integrate the coroutine control yielding with the select-loop, but I'm reaching dark corners.

If that's a blocker this PR could be really about the timeout (as originally intented). The commit "wip: add timeout to supervise immediate tasks" adds a timeout to the regular task process. Maybe we could stop there.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily. It's a certain risk. But it's nothing we cannot revert when things go bad.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe after all it's wise to split the work into smaller chunks to reduce that risk.
Ensuring that immediate tasks don't run too long is still an incremental improvement.

@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch from 6b2781e to 1b427eb Compare February 13, 2025 19:53
@mdellweg
Copy link
Member

We cannot require all immediate tasks to be async without properly deprecating it for plugins.
But we can use sync_to_async for the ones that aren't with a deprecation warning.

@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch from 00a071e to e834821 Compare February 14, 2025 14:21
@pedro-psb
Copy link
Member Author

No idea why this fails only on s3:
https://github.com/pulp/pulpcore/actions/runs/13333279626/job/37242548757?pr=6155#step:14:18219

An exception does get caught...

@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch from 4edeef0 to a7a94b9 Compare March 17, 2025 19:46
Copy link
Member

@mdellweg mdellweg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if you wanted a review at this stage...

@@ -45,6 +45,8 @@
#: Tasks in an incomplete state have not finished their work yet.
TASK_INCOMPLETE_STATES = (TASK_STATES.WAITING, TASK_STATES.RUNNING, TASK_STATES.CANCELING)

#: Timeout for immediate tasks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that seconds?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll add that on the description

@@ -50,7 +52,6 @@ def wakeup_worker():


def execute_task(task):
# This extra stack is needed to isolate the current_task ContextVar
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why drop this comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that was an accident

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely not. But then it doesn't seem to be telling its story.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean dropping the comment was an accident.
The comment is fine. I didnt know what a ContextVar was when I first read it, but then I read about it an it made sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhh. Thanks for the clarification...

Comment on lines 72 to 74
if asyncio.iscoroutine(result):
immediate = task.immediate
is_coroutine = asyncio.iscoroutine(result)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this ever been true? Should we no check if func is a coroutine even before executing it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gerrod showed that this is valid here. But I can look up a more intuitive form to check it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The big problem is that we run the task function first and then we check if it returned something awaitable.
(That is what coroutine functions are: They are functions returning an awaitable generator object.)

While in the old code this may be working properly (unless some task accidentally returned a generator of any sort), here we don't want to run the task function in case it is immediate and not async.

So what should happen, i think, is we should check for asyncio.iscoroutinefunction(func) and then decide whether or how to execute it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sadly the python documentation itself uses the term "coroutine" a bit inconsistently.
Sometimes as the function declared with async, and sometimes as the awaitable object that is the result of calling the former.

@@ -388,6 +390,7 @@ def supervise_task(self, task):
task.save(update_fields=["worker"])
cancel_state = None
cancel_reason = None
time_limit = time.monotonic() + IMMEDIATE_TIMEOUT
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we not use timezone.now here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also didn't we want to let the timeout tick in the async loop?
That will help us maybe run even more than one immediate tasks per worker eventually.
Also the timeout will be in effect even on the api worker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, that's true 🤦
The worker ends up calling execute_task, so we only need it there with the async loop...

)
wait_until("running", task_href)

# Case 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each case should be a separate test. If you want to share resources, you can group them in a class.

@github-actions github-actions bot added the wip label Mar 19, 2025
AND it takes longer than timeout
THEN an error is raised
"""
COMMON_RESOURCE = "PPP"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make the resource a random string to avoid any collisions.
Also why not mark this as parallel too?

# Case 3
dispatch_long_task(requires_resource=COMMON_RESOURCE)
with pytest.raises(PulpTaskError) as ctx:
class TestImmediateTaskWithLocking:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about TestImmediateTaskWithBlockedResource?
When removing the "test" noise, the actual (first) test will read:
"Immediate task with blocked resource executes in task worker." Exactly what we want to read here...

THEN the task completes with a worker
"""
COMMON_RESOURCE = "MMM"
dispatch_long_task(required_resources=[COMMON_RESOURCE])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could cancel the long task after dispatching the regular one in order to speed up the tests dramatically.


# Case 2
with pytest.raises(PulpTaskError) as ctx:
class TestImmediateTaskNoLocking:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestImmediateTaskWithNoResource?

class TestImmediateTaskNoLocking:

@pytest.mark.parallel
def test_succeed_on_api_worker(self, pulpcore_bindings, dispatch_task, monitor_task):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is really nit picky...
test_succeeds_on_api_worker

def test_immediate_task_requires_resource(pulpcore_bindings, dispatch_task, monitor_task):
"""
GIVEN an async task requiring busy resources
task = monitor_task(task_href)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we monitor that task? Should it not return as completed already from dispatch_task? Also we should be able to catch the deprecation warning here and assert on it.
Also we don't want it to bubble into the collected deprecation warnings for the ci.

_logger.debug(_("Task is coroutine %s"), task.pk)
coro = func(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it's wise to call this thing coro given the confusion with that name throughout the python docs.
The only idea i can come up with is "awaitable".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The usage is a bit confusing but there is a clear definition.
Do you think we can try to be strict about those terms?

IMO "awaitable" is too broad, we really expect a coroutine object, which is returned by a coroutine function.
But I can change if you still think that would be confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably ok.

coro = result

coro = asyncio.wait_for(coro, timeout=IMMEDIATE_TIMEOUT)
assert asyncio.iscoroutine(coro), coro
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra safety? (I'm almost certain we can be sure of that now...)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert asyncio.iscoroutine(coro), coro

Let's not add tests for the interpreter to the production code.

@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch 3 times, most recently from 82ff903 to f46846b Compare March 22, 2025 03:18
Comment on lines 1031 to 1033
The stderr can be useful write asserts over log messages.
This is required because the pytest caplog fixture can't capture log messages
streamed from the pulpcore-manager call.
Copy link
Member

@mdellweg mdellweg Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhhh!

Now I'm wondering if we could just use the capsys or capfd fixture instead? (If you want to try that.)
(I'd prefer not adding another fixture to do almost the same thing.)

Copy link
Member

@mdellweg mdellweg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the implementation is sound at this stage...
But still some things came up when reading through.

_logger.debug(_("Task is coroutine %s"), task.pk)
coro = func(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably ok.

coro = result

coro = asyncio.wait_for(coro, timeout=IMMEDIATE_TIMEOUT)
assert asyncio.iscoroutine(coro), coro
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert asyncio.iscoroutine(coro), coro

Let's not add tests for the interpreter to the production code.

Comment on lines 99 to 106
except asyncio.TimeoutError:
raise RuntimeError(
_("Immediate task timed out after {} seconds").format(IMMEDIATE_TIMEOUT)
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This try block really only translates the timeout error to add more information, right?
If so, please add the task uuid also, and shove something in to the log stream of the worker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was supposed to add more info.
But it'll be handled by the outer try/catch-all at L107 (from changed code) and receive all the context info.
Or am I missing something?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. But i think we can still add something like:
logger.warn("Immediate task {task.pk} timed out after ...") So we see it in the worker and api worker logs too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, ok. I've missed that comment

Comment on lines +466 to +468
task_href = dispatch_task(
"pulpcore.app.tasks.test.asleep", args=(LT_TIMEOUT,), immediate=True
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incidentally, these kinds of tasks are now a big driver to wish for asgi pulp instead of wsgi.

raise RuntimeError("Timeout waiting for task to transition")

@contextmanager
def _dispatch_long_task(required_resources: list[str], duration=5):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us stick to the established names exclusive_resources and shared_resources here.

args=(duration,),
exclusive_resources=required_resources,
)
wait_until("running", task_href)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a reason to wait for anything else than an unfinished task in the queue.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning was that for the long task to block the immediate, it must have been picked up first.
So it could be running or cancelling, but not waiting (as we have priority on immediate tasks, it would be picked up first). So, is ["running", "canceling"] ok?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's not true. A task blocks a resource even in waiting state.



@pytest.fixture
def dispatch_long_task(pulpcore_bindings, dispatch_task):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now this fixture sounds kike it should be called resource_blocker.
That we use a task for it, is merely an implementation detail.

Comment on lines 531 to 523
task = pulpcore_bindings.TasksApi.read(task_href)
if task.state == "running":
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
task = pulpcore_bindings.TasksApi.read(task_href)
if task.state == "running":
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})

No need to be polite to that task. Just make sure it goes away.
Maybe we even require it to use a non existing plugin so it will never be picked up at all.

Copy link
Member Author

@pedro-psb pedro-psb Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the task has already finished this call will raise a 409 error on the pulpcore client.
E.g:

task_href = dispatch_task("pulpcore.app.tasks.test.sleep", args=(2,))
time.sleep(3)
pulpcore_bindings.TasksApi.tasks_cancel(task_href, {"state": "canceled"})  # <- raises

But l can just suppress that error instead, didn't think about that before.

assert task.worker is not None

@pytest.mark.parallel
def test_throws_when_non_deferrable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a newly defined behaviour. Can you do me a favor and see whether we already have a test for this?

(Ideally we combine them or pick the nicer one...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we don't test it elsewhere:

> git grep -I -n defer -- pulpcore/tests/
pulpcore/tests/functional/api/test_tasking.py:564:    def test_throws_when_non_deferrable(
pulpcore/tests/functional/api/test_tasking.py:569:        WHEN dispatching as immediate and not deferrable
pulpcore/tests/functional/api/test_tasking.py:578:                deferred=False,

assert "Resources temporarily unavailable." in task.error["reason"]

@pytest.mark.parallel
def test_timeouts_on_task_worker(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nitpick mode: test_times_out_on_task_worker reads more English to me.

@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch from 6658c42 to b2fca1e Compare March 24, 2025 15:13
@pedro-psb pedro-psb requested a review from mdellweg March 24, 2025 17:35
Comment on lines 492 to 489
assert "Support for non-coroutine immediate tasks will be dropped" in stderr
assert "Support for non-coroutine immediate tasks will be dropped" in stderr_content
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! Not adding a new fixture for this makes me happier.

args=(duration,),
exclusive_resources=required_resources,
)
wait_until("running", task_href)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's not true. A task blocks a resource even in waiting state.

args=(duration,),
exclusive_resources=exclusive_resources,
)
wait_until(TASK_INCOMPLETE_STATES, task_href)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
wait_until(TASK_INCOMPLETE_STATES, task_href)

What do you want to wait for here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the wait doesnt make sense anymore. Removing...

But I wanted to share some thoughts I had on this (not wanting to change anything in this PR):

The long task will always be unblocked first because it has lower pulp_created, and the short is assured to be blocked as we intend in the test, right?

I was thinking if we shouldnt sort by immediate task there too (in the unblock_task). This is a violation of the event order, but so is our immediate task prioritization in iter_tasks. I can see there is a diffrence, and feels like it would be worse to violate that in unblocking-time than it is in picking-time, but that's not crystal clear to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well the thing is the immediate task will not be unblocked before the other task is in a final state. And that's the only thing needed to assure resource usage is serialized. It does not matter whether the prior task is identified unblocked or in a waiting, running or canceling state.

@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch 2 times, most recently from d9b1386 to 6ab7dac Compare March 26, 2025 00:21
@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch from 6ab7dac to 52da68c Compare March 26, 2025 00:25
@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch from 799969a to d3e0269 Compare March 26, 2025 12:51
@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch 2 times, most recently from f3b0a48 to ad769c9 Compare March 26, 2025 13:17
* Enforce immediate tasks are run are async function
* Add deprecation warning that immediate tasks must be async function
  starting from pulpcore 3.85
* Add tests to exercise immediate and defer dispatch options

Co-authored-by: Matthias Dellweg <[email protected]>
Closes: pulp#5930
@pedro-psb pedro-psb force-pushed the add-timeout-to-immediate-task branch from ad769c9 to dbae78a Compare March 26, 2025 13:54
@pedro-psb pedro-psb merged commit 6b12bb2 into pulp:main Mar 26, 2025
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add timeout to immediate tasks
2 participants