Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix initial issues with the structure and the example #5

Merged
merged 3 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .makim.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ groups:
run: |
pytest ${{ args.path }} ${{ args.params }}

example:
help: Run the example app
shell: bash
run: |
set -ex
sugar build
sugar ext restart --options -d
sleep 5
cd example/
celery -A tasks.app worker --loglevel=debug &
python app.py

ci:
help: run the sames tests executed on CI
dependencies:
Expand Down
170 changes: 0 additions & 170 deletions example/tasks.py

This file was deleted.

17 changes: 17 additions & 0 deletions example/tasks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Tasks for the example."""

from retsu import TaskManager

from .parallel import MyParallelTask1
from .serial import MySerialTask1


class MyTaskManager(TaskManager):
"""MyTaskManager."""

def __init__(self) -> None:
"""Create a list of retsu tasks."""
self.tasks = {
"serial": MySerialTask1(),
"parallel": MyParallelTask1(workers=2),
}
4 changes: 4 additions & 0 deletions example/tasks/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Module for the celery app."""

from .parallel import * # noqa: F403
from .serial import * # noqa: F403
43 changes: 43 additions & 0 deletions example/tasks/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""Configuration for Celery app."""

import redis

from celery import Celery

app = Celery(
"retsu",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
)

LOG_FORMAT_PREFIX = "[%(asctime)s: %(levelname)s/%(processName)s]"

app.conf.update(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0",
worker_log_format=f"{LOG_FORMAT_PREFIX} %(message)s",
worker_task_log_format=(
f"{LOG_FORMAT_PREFIX} %(task_name)s[%(task_id)s]: %(message)s"
),
task_annotations={"*": {"rate_limit": "10/s"}},
task_track_started=True,
task_time_limit=30 * 60,
task_soft_time_limit=30 * 60,
worker_redirect_stdouts_level="DEBUG",
)

redis_client = redis.Redis(
host="localhost",
port=6379,
db=0,
ssl=False,
)


try:
print("Pinging Redis...")
redis_client.ping()
print("Redis connection is working.")
except redis.ConnectionError as e:
print(f"Failed to connect to Redis: {e}")
exit(1)
66 changes: 66 additions & 0 deletions example/tasks/parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu import ResultTask
from retsu.celery import ParallelCeleryTask

from .config import app, redis_client


@app.task
def task_parallel_a1(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_a1."""
sleep(a + b)
print("running task a1")
result = a + b
redis_client.set(f"result-{task_id}", result)
return result


@app.task
def task_parallel_a2(task_id: str) -> int: # type: ignore
"""Define the task_a2."""
print("running task a2")
result = redis_client.get(f"result-{task_id}")
return result


@app.task
def task_parallel_final(results, task_id: str) -> int: # type: ignore
"""Define the final_task."""
print("running final task")

result = redis_client.get(f"result-{task_id}")
final_result = f"Final result: {result}"
print(final_result)

task_result = ResultTask()

task_result.save(task_id=task_id, result=final_result)

return final_result


class MyParallelTask1(ParallelCeleryTask):
"""MyParallelTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_parallel_a1.s(a, b, task_id),
task_parallel_a2.s(task_id),
],
task_parallel_final.s(task_id),
)
66 changes: 66 additions & 0 deletions example/tasks/serial.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu import ResultTask
from retsu.celery import SerialCeleryTask

from .config import app, redis_client


@app.task
def task_serial_a1(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_a1."""
sleep(a + b)
print("running task a1")
result = a + b
redis_client.set(f"result-{task_id}", result)
return result


@app.task
def task_serial_a2(task_id: str) -> int: # type: ignore
"""Define the task_a2."""
print("running task a2")
result = redis_client.get(f"result-{task_id}")
return result


@app.task
def task_serial_final(results, task_id: str) -> int: # type: ignore
"""Define the final_task."""
print("running final task")

result = redis_client.get(f"result-{task_id}")
final_result = f"Final result: {result}"
print(final_result)

task_result = ResultTask()

task_result.save(task_id=task_id, result=final_result)

return final_result


class MySerialTask1(SerialCeleryTask):
"""MySerialTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> tuple[list[celery.Signature], celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_serial_a1.s(a, b, task_id),
task_serial_a2.s(task_id),
],
task_serial_final.s(task_id),
)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ src = ["./"]
exclude = [
'docs',
]
fix = true

[tool.ruff.lint]
select = [
Expand Down
Loading
Loading