Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc aio server interceptor error with stream_stream calls #3066

Open
tonyay163 opened this issue Dec 4, 2024 · 2 comments
Open

grpc aio server interceptor error with stream_stream calls #3066

tonyay163 opened this issue Dec 4, 2024 · 2 comments
Labels
bug Something isn't working

Comments

@tonyay163
Copy link

tonyay163 commented Dec 4, 2024

Describe your environment

python = "^3.10"
grpcio = "1.64.3"
opentelemetry-distro = "0.48b0"
opentelemetry-instrumentation-grpc = "0.48b0"
protobuf = "5.28.1"

What happened?

Using aio server interceptor with a stream_stream call results in an error:

  File "[PYTHON]/lib/python3.10/site-packages/opentelemetry/instrumentation/grpc/_aio_server.py", line 144, in _stream_interceptor
    raise error
  File "[PYTHON]/lib/python3.10/site-packages/opentelemetry/instrumentation/grpc/_aio_server.py", line 135, in _stream_interceptor
    async for response in behavior(
TypeError: 'async for' requires an object with __aiter__ method, got coroutine

Steps to Reproduce

Use the grpc classes from the example: https://github.com/grpc/grpc/tree/e9c16ace65211b0f0d07ff2fdc7e657865fe8350/examples/python/async_streaming

import asyncio
import logging
from typing import AsyncIterable

import grpc
from opentelemetry.instrumentation.grpc import aio_server_interceptor

import phone_pb2
import phone_pb2_grpc


def create_state_response(
    call_state: phone_pb2.CallState.State,
) -> phone_pb2.StreamCallResponse:
    response = phone_pb2.StreamCallResponse()
    response.call_state.state = call_state
    return response


class Phone(phone_pb2_grpc.PhoneServicer):
    async def StreamCall(
        self,
        request_iterator: AsyncIterable[phone_pb2.StreamCallRequest],
        context: grpc.aio.ServicerContext,
    ):
        async for _ in request_iterator:
            await context.write(
                phone_pb2.StreamCallResponse()
            )


async def serve(address: str) -> None:
    server = grpc.aio.server(interceptors=[aio_server_interceptor()])
    phone_pb2_grpc.add_PhoneServicer_to_server(Phone(), server)
    server.add_insecure_port(address)
    await server.start()

    # Make a call to the server
    async def call_request_stream():
        yield phone_pb2.StreamCallRequest()
    async with grpc.aio.insecure_channel(address) as channel:
        stub = phone_pb2_grpc.PhoneStub(channel)
        async for _ in stub.StreamCall(call_request_stream()):
            pass

    await server.stop(grace=None)
    await server.wait_for_termination()


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    asyncio.run(serve("[::]:50051"))

Expected Result

No error

Actual Result

Error

Additional context

No response

Would you like to implement a fix?

None

@tonyay163 tonyay163 added the bug Something isn't working label Dec 4, 2024
@tonyay163
Copy link
Author

tonyay163 commented Dec 4, 2024

Removing this if block seems to work but I don't know why stream_stream calls do this and whether this breaks unary_stream calls:

# handle streaming responses specially
if response_streaming:
return self._intercept_aio_server_stream(
behavior,
handler_call_details,
)

@tonyay163 tonyay163 changed the title grpc aio server interceptor grpc aio server interceptor error with stream_stream calls Dec 4, 2024
@tonyay163
Copy link
Author

This seems to be fine if you use an async generator for the stream call instead of context.write, so just an edge case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant