Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stream large file to AsyncApp (application/octet-stream) #1877

Open
alfechner opened this issue Feb 13, 2024 · 7 comments
Open

Stream large file to AsyncApp (application/octet-stream) #1877

alfechner opened this issue Feb 13, 2024 · 7 comments

Comments

@alfechner
Copy link
Contributor

Description

I would like to stream a large file to my AsyncApp. My app is supposed to proxy the incoming bytes to another service. The goal is to reduce the memory footprint.

My spec looks like this:

openapi: 3.0.3
info:
  title: LIQUID Dataset Service
  version: 1.0.0
paths:
  /foo:
    post:
      x-openapi-router-controller: my_controller
      operationId: my_controller_function
      requestBody:
        required: true
        content:
          application/octet-stream:
            schema:
              type: string
              format: binary

I receive the bytes of my file in my_controller_function but already fully loaded. Means, the entire file is in memory already.

Is there any way to get hand on the stream in the controller before fully transferring the entire content?

  • Python 3.12.1
  • Version: 3.0.5
@RobbeSneyders
Copy link
Member

Hi @alfechner,

I think you have two options:

  • Use a different content type like multipart/form-data instead.

    requestBody:
    content:
      multipart/form-data:
        schema:
          type: object
          properties:
            file:
              type: string
              format: binary

    and accept the file parameter in your handler function:

    def post(file):
        ...

    This will give you a starlette.UploadFile object.

  • Access the body via the request instead:

    from connexion import request
    
    def post():
        stream = request.stream()

    Check the starlette docs for more info on how to access the request.

    It's important to not accept the body parameter in your handler function, since that triggers the materialization of the stream by connexion.

Agree that this could be more clear in the docs.

@alfechner
Copy link
Contributor Author

Hi @RobbeSneyders, really appreciate your helpful input here.

The multipart/form-data didn't work for me. I looked into the code and I believe that starlette's form parser reads the entire stream into memory already before passing the starlette.UploadFile to the controller. I found this comment confirming my finding.

I made the stream option work:

async def test_stream():
    request = Request(scope, receive)
    async for chunk in request.stream():
       ....

Just a note: When passing the body parameter to test_stream, the stream() function runs into a infinitive loop. The _stream_consumed property is not set correctly but the stream is empty. I assume this is somehow related to the copying of the Request object.

@RobbeSneyders
Copy link
Member

Ok, thanks for diving into this and reporting back.

Any reason you're creating a new request instead of importing the one we make available via from connexion import request?

@alfechner
Copy link
Contributor Author

No, you're right. It works the exact same way with from connexion.context import request. The code above is a leftover from my attempt to work around the infinitive loop issue I mentioned. But it's not changing anything at all.

@RobbeSneyders
Copy link
Member

Is the infinite loop issue you mention the same as described in #1812?

@alfechner
Copy link
Contributor Author

alfechner commented Feb 15, 2024

Is the infinite loop issue you mention the same as described in #1812?

Looks like it, yes 👍

@alfechner
Copy link
Contributor Author

I wanted to have another look at the multipart/form-data. I have a request body with (1) a file and (2) some metadata for that file:

requestBody:
  required: true
  content:
    multipart/form-data:
      schema:
        type: object
        properties:
          file:
            type: string
            format: binary
          metadata:
            type: object
            additionalProperties: true
      encoding:
        metadata:
          contentType: application/json

I wanted to see if I can modify starlette's MultiPartParser in a way that I can intercept the stream while reading in the bytes of the file and stream it to the other server. The metadata would only be available after reading the entire stream (and sending it of) which would be fine in my case.

I tried to run the original MultiPartParser on the request to get started:

from connexion.context import request
from starlette.formparsers import MultiPartParser

async def test_form_data():
    form_data = MultiPartParser(headers=request.headers, stream=request.stream()).parse()

I'm running into an infinite loop again.

Opposed to the application/octet-stream example there is a validator registered for body type multipart/form-data. This means the validation takes place before invoking the controller and "materializes" the stream.

In order to disable the validation I unset the body validator for multipart/form-data via the validator_map setting and verified that the verification is skipped.

Even tough there is no validation anymore I'm hanging in the infinite loop and I found the parameter decorator to consume the stream when loading the files.

Would it make sense to add a _maybe_get_files method like _maybe_get_body? What do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants