|
| 1 | +from typing import List, Union, Generator, Iterator, Optional, AsyncGenerator |
| 2 | +from pydantic import BaseModel |
| 3 | +import os |
| 4 | +from langchain_core.prompts import ChatPromptTemplate |
| 5 | +from langchain_openai import ChatOpenAI |
| 6 | +from langchain_core.output_parsers.string import StrOutputParser |
| 7 | +import logging |
| 8 | +from pprint import pprint |
| 9 | + |
| 10 | +logger = logging.getLogger(__name__) |
| 11 | + |
| 12 | + |
| 13 | +class Pipeline: |
| 14 | + class Valves(BaseModel): |
| 15 | + OPENAI_API_KEY: str |
| 16 | + |
| 17 | + def __init__(self): |
| 18 | + self.debug = False |
| 19 | + self.name = "Langchain Pipeline" |
| 20 | + self.valves = self.Valves(**{"OPENAI_API_KEY": os.getenv("OPENAI_API_KEY", "mykey")}) |
| 21 | + |
| 22 | + |
| 23 | + async def on_startup(self): |
| 24 | + print(f"on_startup:{__name__}") |
| 25 | + os.environ["OPENAI_API_KEY"] = self.valves.OPENAI_API_KEY |
| 26 | + |
| 27 | + |
| 28 | + async def on_shutdown(self): |
| 29 | + print(f"on_shutdown:{__name__}") |
| 30 | + |
| 31 | + |
| 32 | + async def inlet(self, body: dict, user: Optional[dict] = None) -> dict: |
| 33 | + # This function is called before the OpenAI API request is made. You can modify the form data before it is sent to the OpenAI API. |
| 34 | + print(f"inlet: {__name__}") |
| 35 | + if self.debug: |
| 36 | + print(f"inlet: {__name__} - body:") |
| 37 | + pprint(body) |
| 38 | + print(f"inlet: {__name__} - user:") |
| 39 | + pprint(user) |
| 40 | + return body |
| 41 | + |
| 42 | + |
| 43 | + async def outlet(self, body: dict, user: Optional[dict] = None) -> dict: |
| 44 | + # This function is called after the OpenAI API response is completed. You can modify the messages after they are received from the OpenAI API. |
| 45 | + print(f"outlet: {__name__}") |
| 46 | + if self.debug: |
| 47 | + print(f"outlet: {__name__} - body:") |
| 48 | + pprint(body) |
| 49 | + print(f"outlet: {__name__} - user:") |
| 50 | + pprint(user) |
| 51 | + return body |
| 52 | + |
| 53 | + |
| 54 | + async def pipe( |
| 55 | + self, user_message: str, model_id: str, messages: List[dict], body: dict |
| 56 | + ) -> AsyncGenerator[str, None]: |
| 57 | + """ |
| 58 | + Asynchronous pipe function that streams responses |
| 59 | + """ |
| 60 | + llm = ChatOpenAI(model="gpt-4o-mini", temperature=0, streaming=True,) |
| 61 | + prompt = ChatPromptTemplate.from_template("tell me a joke about {topic}. Also explain me why it is funny in detail.") |
| 62 | + chain = prompt | llm | StrOutputParser() |
| 63 | + |
| 64 | + return chain.astream({"topic": user_message}) |
0 commit comments