Skip to content

Commit

Permalink
Merge pull request #62 from schireson/dc/pipe-ignore
Browse files Browse the repository at this point in the history
feat: Add pipe_ignore option to enqueue.
  • Loading branch information
DanCardin authored Apr 1, 2024
2 parents 618f307 + 6e23aa8 commit 72394b2
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "strapp"
version = "0.3.21"
version = "0.3.22"
description = ""
authors = []
packages = [
Expand Down
3 changes: 2 additions & 1 deletion src/strapp/dramatiq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from strapp.dramatiq.base import configure, enqueue, get_result, PreparedActor
from strapp.dramatiq.base import configure, enqueue, get_result, PreparedActor, message

__all__ = [
"PreparedActor",
"configure",
"enqueue",
"get_result",
"message",
]
7 changes: 7 additions & 0 deletions src/strapp/dramatiq/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ def enqueue(
queue_name="default",
broker=None,
pipe_target: Optional[dramatiq.Message] = None,
pipe_ignore: bool = False,
**kwargs,
) -> dramatiq.Message:
"""Enqueue work onto the queue, by `task_name`.
Expand All @@ -108,6 +109,7 @@ def enqueue(
queue_name: optional queue name. defaults to "default".
broker: Overrides the global broker
pipe_target: Optional pipe target. This is used to chain tasks together.
pipe_target: When True, ignores the result of the previous actor in the pipeline.
**kwargs: Passed through to the corresponding `@actor` function. Must be json serializable.
"""
if broker is None:
Expand All @@ -117,6 +119,7 @@ def enqueue(
task_name,
queue_name=queue_name,
pipe_target=pipe_target,
pipe_ignore=pipe_ignore,
**kwargs,
)
return broker.enqueue(m)
Expand All @@ -127,13 +130,17 @@ def message(
*,
queue_name="default",
pipe_target: Optional[dramatiq.Message] = None,
pipe_ignore: bool = False,
**kwargs,
) -> dramatiq.Message:
"""Create a dramatiq message."""
options = {}
if pipe_target:
options["pipe_target"] = pipe_target.asdict()

if pipe_ignore:
options["pipe_ignore"] = True

return dramatiq.Message(
queue_name=queue_name,
actor_name=task_name,
Expand Down

0 comments on commit 72394b2

Please sign in to comment.