Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dify connection #377

Open
chengleilovesky opened this issue Aug 14, 2024 · 5 comments
Open

dify connection #377

chengleilovesky opened this issue Aug 14, 2024 · 5 comments

Comments

@chengleilovesky
Copy link

I implemented dify as LLM. I implemented dify as LLM.

class DifyHttpService(LLMService):
"""DifyHttpService 使用 aiohttp 来发送异步 HTTP 请求,并解析响应数据。

该服务消耗 OpenAILLMContextFrame 帧,包含一个引用 OpenAILLMContext 帧的对象。
OpenAILLMContext 对象定义了发送到 LLM 进行完成的上下文。
这包括用户、助手和系统消息,以及工具选择和使用的工具(如果请求函数调用来自 LLM)。
"""

def __init__(self, api_key: str, base_url: str = 'http://localhost/v1', **kwargs):
    super().__init__(**kwargs)
    self.api_key = api_key
    self.base_url = base_url

async def send_chat_request(self, query: str, response_mode: str = "streaming", conversation_id: str = "",
                            user: str = "abc-123"):
    url = f'{self.base_url}/chat-messages'
    headers = {
        'Authorization': f'Bearer {self.api_key}',
        'Content-Type': 'application/json',
    }
    data = {
        "inputs": {},
        "query": query,
        "response_mode": response_mode,
        "conversation_id": conversation_id,
        "user": user,
    }

    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(url, headers=headers, json=data) as response:
                response.raise_for_status()
                async for line in response.content:
                    decoded_line = line.decode('utf-8').strip()
                    if decoded_line.startswith("data: "):
                        json_data = decoded_line[6:]
                        parsed_data = self.parse_message_data(json_data)
                        if parsed_data.get("answer") not in [None, ""]:
                            await self.push_frame(TextFrame(parsed_data.get("answer")))



        except aiohttp.ClientResponseError as e:
            print(f"HTTP error occurred: {e.status} - {e.message}")
        except Exception as e:
            print(f"An error occurred: {e}")
            return None


async def _process_context(self, context: OpenAILLMContext):
    await self.push_frame(LLMFullResponseStartFrame())
    logger.debug(f"Generating chat: {context.get_messages_json()}")
    messages = self._get_messages_from_str(context)
    await self.start_ttfb_metrics()
    await self.send_chat_request(messages)
    await self.stop_ttfb_metrics()
    await self.push_frame(LLMFullResponseEndFrame())


async def process_frame(self, frame: Frame, direction: FrameDirection):
    await super().process_frame(frame, direction)

    context = None

    if isinstance(frame, OpenAILLMContextFrame):
        context: OpenAILLMContext = frame.context
    elif isinstance(frame, LLMMessagesFrame):
        context = OpenAILLMContext.from_messages(frame.messages)
    elif isinstance(frame, VisionImageRawFrame):
        context = OpenAILLMContext.from_image_frame(frame)
    elif isinstance(frame, LLMModelUpdateFrame):
        logger.debug(f"Switching LLM model to: [{frame.model}]")
        self._create_client(frame.model)
    else:
        await self.push_frame(frame, direction)

    if context:
        await self._process_context(context)



def parse_message_data(self, json_data: str) -> dict:
    """
    解析从服务器返回的 JSON 数据,提取有用的信息。

    :param json_data: JSON 格式的字符串数据
    :return: 包含提取数据的字典
    """
    try:
        # 将 JSON 字符串解析为 Python 字典
        data = json.loads(json_data)

        # 提取有用的字段
        parsed_data = {
            "event": data.get("event"),
            "conversation_id": data.get("conversation_id"),
            "message_id": data.get("message_id"),
            "created_at": data.get("created_at"),
            "task_id": data.get("task_id"),
            "id": data.get("id"),
            "answer": data.get("answer"),
        }

        return parsed_data

    except json.JSONDecodeError as e:
        print(f"Failed to decode JSON: {e}")
        return {}

def _get_messages_from_str(
        self, context: OpenAILLMContext) -> str:
    openai_messages = context.get_messages()
    google_messages = []
    logger.debug(openai_messages)
    for message in openai_messages:
        content = message["content"]
        google_messages.append(content)
    return google_messages[-1]
@chengleilovesky
Copy link
Author

In the websocket server case, the effect of using the user speaking interruption function is not particularly good. I found that the audio transmission is too fast, which makes it impossible to interrupt. Will it cause two segments of speech to be output simultaneously? Interrupting in Daily is very effective

@ramishi
Copy link

ramishi commented Aug 16, 2024

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

@aconchillo
Copy link
Contributor

There's this PR #378 from @ramishi

@chengleilovesky
Copy link
Author

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

How can I give you an example?

@ramishi
Copy link

ramishi commented Aug 20, 2024

do you have an example bot.py on how to us this? I have similar use case and would like to try your implementation.

How can I give you an example?

you can post it here please @chengleilovesky or we can PM if it's better for you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants