Skip to content

Commit

Permalink
Add tests for parallel tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
xmnlab committed May 30, 2024
1 parent 602fa04 commit 60a159f
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 20 deletions.
93 changes: 93 additions & 0 deletions tests/test_task_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Tests for retsu package."""

from __future__ import annotations

from datetime import datetime
from time import sleep
from typing import Any, Generator

import pytest

from retsu import ParallelTask, Task


class MyResultTask(ParallelTask):
"""Task for the test."""

def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore
"""Return the sum of the given 2 numbers."""
a = kwargs.pop("a", 0)
b = kwargs.pop("b", 0)
result = a + b
return result


class MyTimestampTask(ParallelTask):
"""Task for the test."""

def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore
"""Sleep the given seconds, and return the current timestamp."""
sleep_time = kwargs.pop("sleep", 0)
sleep(sleep_time)
return datetime.now().timestamp()


@pytest.fixture
def task_result() -> Generator[Task, None, None]:
"""Create a fixture for MyResultTask."""
task = MyResultTask(workers=10)
task.start()
yield task
task.stop()


@pytest.fixture
def task_timestamp() -> Generator[Task, None, None]:
"""Create a fixture for MyResultTask."""
task = MyTimestampTask(workers=10)
task.start()
yield task
task.stop()


class TestParallelTask:
"""TestParallelTask."""

def test_parallel_result(self, task_result: Task) -> None:
"""Run simple test for a parallel task."""
results: dict[str, int] = {}

task = task_result

for i in range(10):
task_id = task.request(a=i, b=i)
results[task_id] = i + i

for task_id, expected in results.items():
result = task.result.get(task_id, timeout=2)
assert (
result == expected
), f"Expected Result: {expected}, Actual Result: {result}"

def test_parallel_timestamp(self, task_timestamp: Task) -> None:
"""Run simple test for a parallel task."""
results: list[tuple[str, int]] = []

task = task_timestamp

for sleep_time in range(5, 1, -1):
task_id = task.request(sleep=sleep_time)
results.append((task_id, 0))

# gather results
for i, (task_id, _) in enumerate(results):
results[i] = (task_id, task.result.get(task_id, timeout=10))

# check results
previous_timestamp = results[0][1]
for _, current_timestamp in results[1:]:
assert current_timestamp < previous_timestamp, (
f"Previous timestamp: {previous_timestamp}, "
f"Current timestamp: {current_timestamp}"
)
previous_timestamp = current_timestamp
81 changes: 61 additions & 20 deletions tests/test_task_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,51 +2,92 @@

from __future__ import annotations

from typing import Any
from datetime import datetime
from time import sleep
from typing import Any, Generator

import pytest

from retsu import SerialTask, Task


class MyTask(SerialTask):
class MyResultTask(SerialTask):
"""Task for the test."""

def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore
"""Task that sum the given 2 numbers."""
"""Return the sum of the given 2 numbers."""
a = kwargs.pop("a", 0)
b = kwargs.pop("b", 0)
return a + b
result = a + b
return result


class MyTimestampTask(SerialTask):
"""Task for the test."""

def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore
"""Sleep the given seconds, and return the current timestamp."""
sleep_time = kwargs.pop("sleep", 0)
sleep(sleep_time)
return datetime.now().timestamp()

class SetupTask:
"""Setup the task workflow for the test."""

task: Task
@pytest.fixture
def task_result() -> Generator[Task, None, None]:
"""Create a fixture for MyResultTask."""
task = MyResultTask()
task.start()
yield task
task.stop()

@classmethod
def setup_class(cls) -> None:
"""Set the class configuration for the test."""
cls.task = MyTask()
cls.task.start()

@classmethod
def teardown_class(cls) -> None:
"""Set the class teardown step for the test."""
cls.task.stop()
@pytest.fixture
def task_timestamp() -> Generator[Task, None, None]:
"""Create a fixture for MyResultTask."""
task = MyTimestampTask()
task.start()
yield task
task.stop()


class TestSerialTask(SetupTask):
class TestSerialTask:
"""TestSerialTask."""

def test_serial_simple(self) -> None:
def test_serial_result(self, task_result: Task) -> None:
"""Run simple test for a serial task."""
results: dict[str, int] = {}

task = task_result

for i in range(10):
task_id = self.task.request(a=i, b=i)
task_id = task.request(a=i, b=i)
results[task_id] = i + i

for task_id, expected in results.items():
result = self.task.result.get(task_id, timeout=2)
result = task.result.get(task_id, timeout=2)
assert (
result == expected
), f"Expected Result: {expected}, Actual Result: {result}"

def test_serial_timestamp(self, task_timestamp: Task) -> None:
"""Run simple test for a serial task."""
results: list[tuple[str, int]] = []

task = task_timestamp

for sleep_time in range(5, 1, -1):
task_id = task.request(sleep=sleep_time)
results.append((task_id, 0))

# gather results
for i, (task_id, _) in enumerate(results):
results[i] = (task_id, task.result.get(task_id, timeout=10))

# check results
previous_timestamp = results[0][1]
for _, current_timestamp in results[1:]:
assert current_timestamp > previous_timestamp, (
f"Previous timestamp: {previous_timestamp}, "
f"Current timestamp: {current_timestamp}"
)
previous_timestamp = current_timestamp

0 comments on commit 60a159f

Please sign in to comment.