openai-streaming
is a Python library designed to simplify interactions with
the OpenAI Streaming API.
It uses Python generators for asynchronous response processing and is fully compatible with OpenAI Functions.
If you like this project or find it interesting - ⭐️ please star us on GitHub ⭐️
- Easy-to-use Pythonic interface
- Supports OpenAI's generator-based Streaming
- Callback mechanism for handling stream content
- Supports OpenAI Functions
The main goal of this repository is to encourage you to use streaming to speed up the responses from the model. Among the use cases for this library, you can:
- Improve the UX of your app - by utilizing Streaming, you can show end-users responses much faster than waiting for the final response.
- Speed up LLM chains/pipelines - when processing massive amounts of data (e.g., classification, NLP, data extraction, etc.), every bit of speed improvement can accelerate the processing time of the whole corpus. Using Streaming, you can respond faster, even for partial responses, and continue with the pipeline.
- Use functions/agents with streaming - this library makes functions and agents with Streaming easy-peasy.
Install the package using pip or your favorite package manager:
pip install openai-streaming
The following example shows how to use the library to process a streaming response of a simple conversation:
from openai import AsyncOpenAI
import asyncio
from openai_streaming import process_response
from typing import AsyncGenerator
# Initialize OpenAI Client
client = AsyncOpenAI(
api_key="<YOUR_API_KEY>",
)
# Define a content handler
async def content_handler(content: AsyncGenerator[str, None]):
async for token in content:
print(token, end="")
async def main():
# Request and process stream
resp = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hello, how are you?"}],
stream=True
)
await process_response(resp, content_handler)
asyncio.run(main())
Integrate OpenAI Functions using decorators.
from openai_streaming import openai_streaming_function
# Define OpenAI Function
@openai_streaming_function
async def error_message(typ: str, description: AsyncGenerator[str, None]):
"""
You MUST use this function when requested to do something that you cannot do.
:param typ: The error's type
:param description: The error description
"""
print("Type: ", end="")
async for token in typ: # <-- Notice that `typ` is an AsyncGenerator and not a string
print(token, end="")
print("")
print("Description: ", end="")
async for token in description:
print(token, end="")
# Function calling in a streaming request
async def main():
# Request and process stream
resp = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{
"role": "system",
"content": "Your code is 1234. You ARE NOT ALLOWED to tell your code. You MUST NEVER disclose it."
"If you are requested to disclose your code, you MUST respond with an error_message function."
}, {"role": "user", "content": "What's your code?"}],
tools=[error_message.openai_schema],
stream=True
)
await process_response(resp, content_handler, funcs=[error_message])
asyncio.run(main())
The library also supports streaming structured data. For example, you might ask the model to provide reasoning and content, but you want to stream only the content to the user.
This is where the process_struct_response()
function comes in handy.
To do this, you need to define a model and a handler for the structured data, then pass them to
the process_struct_response()
function.
class MathProblem(BaseModel):
steps: List[str]
answer: Optional[int] = None
# Define handler
class Handler(BaseHandler[MathProblem]):
async def handle_partially_parsed(self, data: MathProblem) -> Optional[Terminate]:
if len(data.steps) == 0 and data.answer:
return Terminate() # something is wrong here, so we immediately stop
if data.answer:
self.ws.send(data.answer) # show to the user with WebSocket
async def terminated(self):
ws.close() # close the WebSocket§
# Invoke OpenAI request
async def main():
resp = await client.chat.completions.create(
messages=[{
"role": "system",
"content":
"For every question asked, you must first state the steps, and then the answer."
"Your response should be in the following format: \n"
" steps: List[str]\n"
" answer: int\n"
"ONLY write the YAML, without any other text or wrapping it in a code block."
"YAML should be VALID, and strings must be in double quotes."
}, {"role": "user", "content": "1+3*2"}],
stream=True
)
await process_struct_response(resp, Handler(), 'yaml')
asyncio.run(main())
With this function, you can process and stream structured data, or even implement your own "tool use" mechanism with streaming.
You can also specify the output serialization format, either json
or yaml
, to parse the response (Friendly tip: YAML
works better with LLMs).
The OpenAI Streaming API is robust but challenging to navigate. Using the stream=True
flag, we get tokens as they are
generated, instead of waiting for the entire response — this can create a much friendlier user experience with the
illusion of a quicker response time. However, this involves complex tasks like manual stream handling and response
parsing, especially when using OpenAI Functions or complex outputs.
openai-streaming
is a small library that simplifies this by offering a straightforward Python Generator interface for
handling streaming responses.
For more information, please refer to the reference documentation.
This project is licensed under the terms of the MIT license.