-
Notifications
You must be signed in to change notification settings - Fork 6
Description
Hey! Thank you for the library.
For some reason, I would use CBOR serializer instead of JSON.
The system could be configured easily enough by overloading Pipeline
and AbstractStep
dumps()
and loads()
methods. It works pretty well at least with aio-pika broker thanks to the fact that pika headers could be not only strings but also bytes.
The solution looks a little cumbersome though, it requires overloading more methods than it should be.
As a proper and generic resolution of the problem, I could imagine adding a new abstraction: AsyncBroker.serializer. It should be an instance of the following class:
class TaskiqSerializer(ABC):
@abstractmethod
def dumps(self, value: Any) -> bytes: ...
@abstractmethod
def loads(self, value: bytes) -> Any: ...
The difference between TaskiqSerializer
and TaskiqFormatter
is that TaskiqFormatter
works with TaskiqMessage
only but serializer should operate with any primitive types that are supported by underlying serialization system (things like decimals, datetimes, etc). Serializer supersedes formatter ABC and could be used instead with deprecation of TaskiqFormatter
class and AsyncBroker.formatter
attribute.
Another subtle problem is the type of TaskiqMessage.headers
and BrokerMessage.headers
. I don't think it is the showstopper; AsyncTaskiqDecoratedTask.labels
is Dict[str, Any]
already which is the correct type I believe.
What do you think? If you agree with my thoughts I can try to create PR for taskiq itself first and modify taskiq-pipelines
later.