Skip to content

Bug: Custom Serialization/Deserialization logic in Redis doesn't work unless UTF-8 serializable #2061

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
chrisgoddard opened this issue Feb 5, 2025 · 8 comments
Labels
bug Something isn't working good first issue Good for newcomers Redis Issues related to `faststream.redis` module and Redis features

Comments

@chrisgoddard
Copy link

Describe the bug
I am working on a middleware to add msgpack serialization to all messages (using ormsgpack which natively handles Pydantic models).

The issue seems to be that Faststream JSON-serializes messages along with header here:

# faststream/redis/parser.py
class RawMessage:
# line: 85
    @classmethod
    def encode(
        cls,
        *,
        message: Union[Sequence["SendableMessage"], "SendableMessage"],
        reply_to: Optional[str],
        headers: Optional["AnyDict"],
        correlation_id: str,
    ) -> bytes:
        msg = cls.build(
            message=message,
            reply_to=reply_to,
            headers=headers,
            correlation_id=correlation_id,
        )

        return dump_json(
            {
                "data": msg.data,
                "headers": msg.headers,
            }
        )

Technically msg.data is supposed to be able to be bytes but in practice it has to be utf-8 compatible or it raises an exception:

/.../.venv/lib/python3.12/site-packa │
│ ges/faststream/redis/publisher/producer.py:79 in publish                              │
│                                                                                       │
│    76 │   │   │   psub = self._connection.pubsub()                                    │
│    77 │   │   │   await psub.subscribe(reply_to)                                      │
│    78 │   │                                                                           │
│ ❱  79 │   │   msg = RawMessage.encode(                                                │
│    80 │   │   │   message=message,                                                    │
│    81 │   │   │   reply_to=reply_to,                                                  │
│    82 │   │   │   headers=headers,                                                    │
│                                                                                       │
│ ╭───────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │        channel = None                                                             │ │
│ │ correlation_id = '2ac5886a-736d-4712-b878-448b8b041f43'                           │ │
│ │        headers = None                                                             │ │
│ │           list = 'job-queue'                                                      │ │
│ │         maxlen = None                                                             │ │
│ │        message = b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far'               │ │
│ │           psub = None                                                             │ │
│ │  raise_timeout = False                                                            │ │
│ │       reply_to = ''                                                               │ │
│ │            rpc = False                                                            │ │
│ │    rpc_timeout = 30.0                                                             │ │
│ │           self = <faststream.redis.publisher.producer.RedisFastProducer object at │ │
│ │                  0x1137972f0>                                                     │ │
│ │         stream = None                                                             │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────╯ │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/redis/parser.py:101 in encode                                          │
│                                                                                       │
│    98 │   │   │   correlation_id=correlation_id,                                      │
│    99 │   │   )                                                                       │
│   100 │   │                                                                           │
│ ❱ 101 │   │   return dump_json(                                                       │
│   102 │   │   │   {                                                                   │
│   103 │   │   │   │   "data": msg.data,                                               │
│   104 │   │   │   │   "headers": msg.headers,                                         │
│                                                                                       │
│ ╭────────────────────────────────── locals ───────────────────────────────────╮       │
│ │ correlation_id = '2ac5886a-736d-4712-b878-448b8b041f43'                     │       │
│ │        headers = None                                                       │       │
│ │        message = b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far'         │       │
│ │            msg = <faststream.redis.parser.RawMessage object at 0x10e69c790> │       │
│ │       reply_to = ''                                                         │       │
│ ╰─────────────────────────────────────────────────────────────────────────────╯       │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/_compat.py:93 in dump_json                                             │
│                                                                                       │
│    90 │   │   return to_jsonable_python(model, **kwargs)                              │
│    91 │                                                                               │
│    92 │   def dump_json(data: Any) -> bytes:                                          │
│ ❱  93 │   │   return json_dumps(model_to_jsonable(data))                              │
│    94 │                                                                               │
│    95 │   def get_model_fields(model: Type[BaseModel]) -> Dict[str, Any]:             │
│    96 │   │   return model.model_fields                                               │
│                                                                                       │
│ ╭───────────────────────────────────── locals ─────────────────────────────────────╮  │
│ │ data = {                                                                         │  │
│ │        │   'data': b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far',           │  │
│ │        │   'headers': {                                                          │  │
│ │        │   │   'correlation_id': '2ac5886a-736d-4712-b878-448b8b041f43'          │  │
│ │        │   }                                                                     │  │
│ │        }                                                                         │  │
│ ╰──────────────────────────────────────────────────────────────────────────────────╯  │
│                                                                                       │
│ /.../.venv/lib/python3.12/site-packa │
│ ges/faststream/_compat.py:90 in model_to_jsonable                                     │
│                                                                                       │
│    87 │   │   model: BaseModel,                                                       │
│    88 │   │   **kwargs: Any,                                                          │
│    89 │   ) -> Any:                                                                   │
│ ❱  90 │   │   return to_jsonable_python(model, **kwargs)                              │
│    91 │                                                                               │
│    92 │   def dump_json(data: Any) -> bytes:                                          │
│    93 │   │   return json_dumps(model_to_jsonable(data))                              │
│                                                                                       │
│ ╭───────────────────────────────────── locals ──────────────────────────────────────╮ │
│ │ kwargs = {}                                                                       │ │
│ │  model = {                                                                        │ │
│ │          │   'data': b'\x82\xa3url\xb1http://kelly.com/\xa4type\xa3far',          │ │
│ │          │   'headers': {                                                         │ │
│ │          │   │   'correlation_id': '2ac5886a-736d-4712-b878-448b8b041f43'         │ │
│ │          │   }                                                                    │ │
│ │          }                                                                        │ │
│ ╰───────────────────────────────────────────────────────────────────────────────────╯ │
╰───────────────────────────────────────────────────────────────────────────────────────╯
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x82 in position 0: invalid utf-8

Here's some test code:

@app.after_startup
async def startup():
    faker = Faker("en_US")

    urls = [faker.url() for _ in range(10)]

    for url in tqdm.tqdm(urls):
        obj = Request(
            url=url,
            type=faker.word(),
        )
        await broker.publish(
            ormsgpack.packb(obj, option=ormsgpack.OPT_SERIALIZE_PYDANTIC),
            list="job-queue",
        )

The error is occurring on the producer side and the only workaround I've found so far is doing a runtime monkey-patch of the RawMessage.encode method so do the msgpack serialization at the final messaging encoding phase. This complicates things on the parsing side though as it breaks the normal message parsing (i.e. in order to get message headers, correlation_id, etc)

Any suggestions? Perhaps there needs to be additional Middleware hooks for handling the final message serialization and initial message deserialization so that serialization methods that utilize non utf-8 compatible binary are supported?

The only other option I can think of is base64 encoding the binary before message serialization which would kind of defeat the space-saving purpose of using a binary format.

Related: #1255

@chrisgoddard chrisgoddard added the bug Something isn't working label Feb 5, 2025
@Lancetnik
Copy link
Collaborator

You can pass default redis-py option right to the broker constructor to control this behavior:
https://github.com/airtai/faststream/blob/main/faststream/redis/broker/broker.py#L113-L117

@Lancetnik Lancetnik added the Redis Issues related to `faststream.redis` module and Redis features label Feb 5, 2025
@Lancetnik
Copy link
Collaborator

Sorry, I got the problem just now. Here is a full MRE

from msgpack import packb

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)

@broker.subscriber("tests")
async def handler(data, logger: Logger):
    logger.info(f"Received data: {data}")

@app.after_startup
async def start():
    await broker.publish(
        packb({"id": "12345678" * 4, "date": "2021-01-01T00:00:00Z"}),
        "tests",
    )

@Lancetnik Lancetnik moved this to Quick wins in FastStream Feb 22, 2025
@Lancetnik Lancetnik added the good first issue Good for newcomers label Feb 22, 2025
@Lancetnik
Copy link
Collaborator

I think, we should reinvent our own message format to publish

@chrisgoddard
Copy link
Author

Is the issue that you have to wrap whatever the client message is so that you get the necessary metadata you need on the receiving end?

@Lancetnik
Copy link
Collaborator

Is the issue that you have to wrap whatever the client message is so that you get the necessary metadata you need on the receiving end?

Yeah, we have to use headers. Therefore we made our own message format, but I designed it bad and you faced with a serialization problem. I think, we should redisign RowMessage - encode and parse methods to be able serialize any data.

@Yakov-Varnaev
Copy link
Contributor

I might be missing something. It looks like swapping json with msgpack is just enough to resolve the problem.

@Lancetnik
Copy link
Collaborator

I might be missing something. It looks like swapping json with msgpack is just enough to resolve the problem.

It's not an our dependency, so we couldn't use it there

@clippered
Copy link

Encountered this while using protobuf so +1 on this too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers Redis Issues related to `faststream.redis` module and Redis features
Projects
None yet
4 participants