Skip to content

Commit

Permalink
feat: add StreamActivatedJobs
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastbk committed Feb 27, 2025
1 parent 2701e7f commit 6bdfbe1
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 15 deletions.
2 changes: 2 additions & 0 deletions pyzeebe/errors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
ActivateJobsRequestInvalidError,
JobAlreadyDeactivatedError,
JobNotFoundError,
StreamActivateJobsRequestInvalidError,
)
from .message_errors import MessageAlreadyExistsError
from .process_errors import (
Expand Down Expand Up @@ -34,6 +35,7 @@
__all__ = (
"InvalidOAuthCredentialsError",
"ActivateJobsRequestInvalidError",
"StreamActivateJobsRequestInvalidError",
"JobAlreadyDeactivatedError",
"JobNotFoundError",
"MessageAlreadyExistsError",
Expand Down
13 changes: 13 additions & 0 deletions pyzeebe/errors/job_errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@ def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activa
super().__init__(msg)


class StreamActivateJobsRequestInvalidError(PyZeebeError):
def __init__(self, task_type: str, worker: str, timeout: int):
msg = "Failed to activate jobs. Reasons:"
if task_type == "" or task_type is None:
msg = msg + "task_type is empty, "
if worker == "" or worker is None:
msg = msg + "worker is empty, "
if timeout < 1:
msg = msg + "job timeout is smaller than 0ms, "

super().__init__(msg)


class JobAlreadyDeactivatedError(PyZeebeError):
def __init__(self, job_key: int) -> None:
super().__init__(f"Job {job_key} was already stopped (Completed/Failed/Error)")
Expand Down
7 changes: 5 additions & 2 deletions pyzeebe/grpc_internals/zeebe_adapter_base.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import logging
from typing import NoReturn
from typing import TYPE_CHECKING, NoReturn, cast

import grpc

Expand All @@ -14,13 +14,16 @@
from pyzeebe.grpc_internals.grpc_utils import is_error_status
from pyzeebe.proto.gateway_pb2_grpc import GatewayStub

if TYPE_CHECKING:
from pyzeebe.proto.gateway_pb2_grpc import GatewayAsyncStub

logger = logging.getLogger(__name__)


class ZeebeAdapterBase:
def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int = -1):
self._channel = grpc_channel
self._gateway_stub = GatewayStub(grpc_channel)
self._gateway_stub = cast("GatewayAsyncStub", GatewayStub(grpc_channel))
self._connected = True
self.retrying_connection = False
self._max_connection_retries = max_connection_retries
Expand Down
30 changes: 30 additions & 0 deletions pyzeebe/grpc_internals/zeebe_job_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ActivateJobsRequestInvalidError,
JobAlreadyDeactivatedError,
JobNotFoundError,
StreamActivateJobsRequestInvalidError,
)
from pyzeebe.grpc_internals.grpc_utils import is_error_status
from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase
Expand All @@ -20,6 +21,7 @@
ActivateJobsRequest,
CompleteJobRequest,
FailJobRequest,
StreamActivatedJobsRequest,
ThrowErrorRequest,
)
from pyzeebe.types import Variables
Expand Down Expand Up @@ -65,6 +67,34 @@ async def activate_jobs(
raise ActivateJobsRequestInvalidError(task_type, worker, timeout, max_jobs_to_activate) from grpc_error
await self._handle_grpc_error(grpc_error)

async def stream_activate_jobs(
self,
task_type: str,
worker: str,
timeout: int,
variables_to_fetch: Iterable[str],
stream_request_timeout: int,
tenant_ids: Iterable[str] | None = None,
) -> AsyncGenerator[Job]:
try:
async for raw_job in self._gateway_stub.StreamActivatedJobs(
StreamActivatedJobsRequest(
type=task_type,
worker=worker,
timeout=timeout,
fetchVariable=variables_to_fetch,
tenantIds=tenant_ids or [],
),
timeout=stream_request_timeout,
):
job = self._create_job_from_raw_job(raw_job)
logger.debug("Got job: %s from zeebe", job)
yield job
except grpc.aio.AioRpcError as grpc_error:
if is_error_status(grpc_error, grpc.StatusCode.INVALID_ARGUMENT):
raise StreamActivateJobsRequestInvalidError(task_type, worker, timeout) from grpc_error
await self._handle_grpc_error(grpc_error)

def _create_job_from_raw_job(self, response: ActivatedJob) -> Job:
return Job(
key=response.key,
Expand Down
72 changes: 72 additions & 0 deletions pyzeebe/worker/job_poller.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import asyncio
import logging
from collections.abc import AsyncGenerator

from pyzeebe.errors import (
ActivateJobsRequestInvalidError,
StreamActivateJobsRequestInvalidError,
ZeebeBackPressureError,
ZeebeDeadlineExceeded,
ZeebeGatewayUnavailableError,
Expand Down Expand Up @@ -94,3 +96,73 @@ def calculate_max_jobs_to_activate(self) -> int:
async def stop(self) -> None:
self.stop_event.set()
await self.queue.join()


class JobStreamer:
def __init__(
self,
zeebe_adapter: ZeebeJobAdapter,
task: Task,
queue: asyncio.Queue[Job],
worker_name: str,
stream_request_timeout: int,
task_state: TaskState,
tenant_ids: list[str] | None,
) -> None:
self.zeebe_adapter = zeebe_adapter
self.task = task
self.queue = queue
self.worker_name = worker_name
self.stream_request_timeout = stream_request_timeout
self.task_state = task_state
self.tenant_ids = tenant_ids
self.stop_event = asyncio.Event()

def _create_stream(self) -> AsyncGenerator[Job]:
return self.zeebe_adapter.stream_activate_jobs(
task_type=self.task.type,
worker=self.worker_name,
timeout=self.task.config.timeout_ms,
variables_to_fetch=self.task.config.variables_to_fetch or [],
stream_request_timeout=self.stream_request_timeout,
tenant_ids=self.tenant_ids,
)

async def poll(self) -> None:
while self._should_poll():
await self._activate_stream()

async def _activate_stream(self) -> None:
try:
jobs = self.zeebe_adapter.stream_activate_jobs(
task_type=self.task.type,
worker=self.worker_name,
timeout=self.task.config.timeout_ms,
variables_to_fetch=self.task.config.variables_to_fetch or [],
stream_request_timeout=self.stream_request_timeout,
tenant_ids=self.tenant_ids,
)
async for job in jobs:
self.task_state.add(job)
await self.queue.put(job)
except StreamActivateJobsRequestInvalidError:
logger.warning("Stream job requests was invalid for task %s", self.task.type)
raise
except (
ZeebeBackPressureError,
ZeebeGatewayUnavailableError,
ZeebeInternalError,
ZeebeDeadlineExceeded,
) as error:
logger.warning(
"Failed to strean jobs from the gateway. Exception: %s. Retrying in 5 seconds...",
repr(error),
)
await asyncio.sleep(5)

def _should_poll(self) -> bool:
return not self.stop_event.is_set() and (self.zeebe_adapter.connected or self.zeebe_adapter.retrying_connection)

async def stop(self) -> None:
self.stop_event.set()
await self.queue.join()
46 changes: 36 additions & 10 deletions pyzeebe/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from pyzeebe.task import task_builder
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.worker.job_executor import JobExecutor
from pyzeebe.worker.job_poller import JobPoller
from pyzeebe.worker.job_poller import JobPoller, JobStreamer
from pyzeebe.worker.task_router import ZeebeTaskRouter
from pyzeebe.worker.task_state import TaskState

Expand All @@ -34,6 +34,8 @@ def __init__(
poll_retry_delay: int = 5,
tenant_ids: list[str] | None = None,
exception_handler: ExceptionHandler | None = None,
stream_enabled: bool = False,
stream_request_timeout: int = 3600,
):
"""
Args:
Expand All @@ -46,6 +48,9 @@ def __init__(
max_connection_retries (int): Amount of connection retries before worker gives up on connecting to zeebe. To setup with infinite retries use -1
poll_retry_delay (int): The number of seconds to wait before attempting to poll again when reaching max amount of running jobs
tenant_ids (list[str]): A list of tenant IDs for which to activate jobs. New in Zeebe 8.3.
stream_enabled (bool): Enables the job worker to stream jobs. It will still poll for older jobs, but streaming is favored. New in Zeebe 8.4.
stream_request_timeout (int): If streaming is enabled, this sets the timeout on the underlying job stream.
It's useful to set a few hours to load-balance your streams over time. New in Zeebe 8.4.
"""
super().__init__(before, after, exception_handler)
self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries)
Expand All @@ -54,31 +59,46 @@ def __init__(
self.poll_retry_delay = poll_retry_delay
self.tenant_ids = tenant_ids
self._job_pollers: list[JobPoller] = []
self._job_streamers: list[JobStreamer] = []
self._job_executors: list[JobExecutor] = []
self._stop_event = anyio.Event()
self._stream_enabled = stream_enabled
self._stream_request_timeout = stream_request_timeout

def _init_tasks(self) -> None:
self._job_executors, self._job_pollers = [], []
self._job_executors, self._job_pollers, self._job_streamers = [], [], []

for task in self.tasks:
jobs_queue = asyncio.Queue[Job]()
task_state = TaskState()

poller = JobPoller(
self.zeebe_adapter,
task,
jobs_queue,
self.name,
self.request_timeout,
task_state,
self.poll_retry_delay,
self.tenant_ids,
zeebe_adapter=self.zeebe_adapter,
task=task,
queue=jobs_queue,
worker_name=self.name,
request_timeout=self.request_timeout,
task_state=task_state,
poll_retry_delay=self.poll_retry_delay,
tenant_ids=self.tenant_ids,
)
executor = JobExecutor(task, jobs_queue, task_state, self.zeebe_adapter)

self._job_pollers.append(poller)
self._job_executors.append(executor)

if self._stream_enabled:
streamer = JobStreamer(
zeebe_adapter=self.zeebe_adapter,
task=task,
queue=jobs_queue,
worker_name=self.name,
stream_request_timeout=self._stream_request_timeout,
task_state=task_state,
tenant_ids=self.tenant_ids,
)
self._job_streamers.append(streamer)

async def work(self) -> None:
"""
Start the worker. The worker will poll zeebe for jobs of each task in a different asyncio task.
Expand All @@ -97,6 +117,9 @@ async def work(self) -> None:
for poller in self._job_pollers:
tg.start_soon(poller.poll)

for streamer in self._job_streamers:
tg.start_soon(streamer.poll)

for executor in self._job_executors:
tg.start_soon(executor.execute)

Expand All @@ -113,6 +136,9 @@ async def stop(self) -> None:
for poller in self._job_pollers:
await poller.stop()

for streamer in self._job_streamers:
await streamer.stop()

for executor in self._job_executors:
await executor.stop()

Expand Down
48 changes: 48 additions & 0 deletions tests/unit/grpc_internals/zeebe_job_adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
ActivateJobsRequestInvalidError,
JobAlreadyDeactivatedError,
JobNotFoundError,
StreamActivateJobsRequestInvalidError,
)
from pyzeebe.grpc_internals.types import (
CompleteJobResponse,
Expand Down Expand Up @@ -89,6 +90,53 @@ async def test_raises_on_invalid_max_jobs(self):
await jobs.__anext__()


@pytest.mark.asyncio
class TestStreamActivateJobs:
zeebe_job_adapter: ZeebeJobAdapter

@pytest.fixture(autouse=True)
def set_up(self, zeebe_adapter: ZeebeJobAdapter):
self.zeebe_job_adapter = zeebe_adapter

def stream_activate_jobs(
self,
task_type=str(uuid4()),
worker=str(uuid4()),
timeout=randint(10, 100),
request_timeout=100,
variables_to_fetch=[],
tenant_ids=None,
):
return self.zeebe_job_adapter.stream_activate_jobs(
task_type, worker, timeout, variables_to_fetch, request_timeout, tenant_ids
)

async def test_returns_correct_amount_of_jobs(self, grpc_servicer: GatewayMock, task: Task):
active_jobs_count = randint(4, 100)
for _ in range(0, active_jobs_count):
job = random_job(task)
grpc_servicer.active_jobs[job.key] = job

jobs = self.stream_activate_jobs(task_type=task.type)

assert len([job async for job in jobs]) == active_jobs_count

async def test_raises_on_invalid_worker(self):
with pytest.raises(StreamActivateJobsRequestInvalidError):
jobs = self.stream_activate_jobs(worker=None)
await jobs.__anext__()

async def test_raises_on_invalid_job_timeout(self):
with pytest.raises(StreamActivateJobsRequestInvalidError):
jobs = self.stream_activate_jobs(timeout=0)
await jobs.__anext__()

async def test_raises_on_invalid_task_type(self):
with pytest.raises(StreamActivateJobsRequestInvalidError):
jobs = self.stream_activate_jobs(task_type=None)
await jobs.__anext__()


@pytest.mark.asyncio
class TestCompleteJob:
async def test_response_is_of_correct_type(self, zeebe_adapter: ZeebeJobAdapter, first_active_job: Job):
Expand Down
Loading

0 comments on commit 6bdfbe1

Please sign in to comment.