Skip to content

feat: use sk threads for agent comms #205

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 9 additions & 95 deletions src/backend/app_config.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
# app_config.py
import os
import logging
from typing import Optional, List, Dict, Any
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential, ClientSecretCredential
from azure.cosmos.aio import CosmosClient
import os
from typing import Any, Dict, List, Optional

from azure.ai.projects.aio import AIProjectClient
from semantic_kernel.kernel import Kernel
from semantic_kernel.contents import ChatHistory
from azure.cosmos.aio import CosmosClient
from azure.identity import ClientSecretCredential, DefaultAzureCredential
from dotenv import load_dotenv
from semantic_kernel.agents import AzureAIAgentThread # pylint:disable=E0611
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
from semantic_kernel.contents import ChatHistory
from semantic_kernel.functions import KernelFunction
from semantic_kernel.kernel import Kernel

# Load environment variables from .env file
load_dotenv()
Expand Down Expand Up @@ -189,94 +191,6 @@ def get_ai_project_client(self):
logging.error("Failed to create AIProjectClient: %s", exc)
raise

async def create_azure_ai_agent(
self,
agent_name: str,
instructions: str,
tools: Optional[List[KernelFunction]] = None,
client=None,
response_format=None,
temperature: float = 0.0,
):
"""
Creates a new Azure AI Agent with the specified name and instructions using AIProjectClient.
If an agent with the given name (assistant_id) already exists, it tries to retrieve it first.

Args:
kernel: The Semantic Kernel instance
agent_name: The name of the agent (will be used as assistant_id)
instructions: The system message / instructions for the agent
agent_type: The type of agent (defaults to "assistant")
tools: Optional tool definitions for the agent
tool_resources: Optional tool resources required by the tools
response_format: Optional response format to control structured output
temperature: The temperature setting for the agent (defaults to 0.0)

Returns:
A new AzureAIAgent instance
"""
try:
# Get the AIProjectClient
if client is None:
client = self.get_ai_project_client()

# # ToDo: This is the fixed code but commenting it out as agent clean up is no happening yet
# # and there are multiple versions of agents due to testing
# # First try to get an existing agent with this name as assistant_id
# try:
# agent_id = None
# agent_list = await client.agents.list_agents()
# for agent in agent_list.data:
# if agent.name == agent_name:
# agent_id = agent.id
# break
# # If the agent already exists, we can use it directly
# # Get the existing agent definition
# existing_definition = await client.agents.get_agent(agent_id)
# # Create the agent instance directly with project_client and existing definition
# agent = AzureAIAgent(
# client=client,
# definition=existing_definition,
# plugins=tools,
# )

# client.agents.list_agents()

# return agent
# except Exception as e:
# # The Azure AI Projects SDK throws an exception when the agent doesn't exist
# # (not returning None), so we catch it and proceed to create a new agent
# if "ResourceNotFound" in str(e) or "404" in str(e):
# logging.info(
# f"Agent with ID {agent_name} not found. Will create a new one."
# )
# else:
# # Log unexpected errors but still try to create a new agent
# logging.warning(
# f"Unexpected error while retrieving agent {agent_name}: {str(e)}. Attempting to create new agent."
# )

# Create the agent using the project client with the agent_name as both name and assistantId
agent_definition = await client.agents.create_agent(
model=self.AZURE_OPENAI_DEPLOYMENT_NAME,
name=agent_name,
instructions=instructions,
temperature=temperature,
response_format=response_format,
)

# Create the agent instance directly with project_client and definition
agent = AzureAIAgent(
client=client,
definition=agent_definition,
plugins=tools,
)

return agent
except Exception as exc:
logging.error("Failed to create Azure AI Agent: %s", exc)
raise


# Create a global instance of AppConfig
config = AppConfig()
171 changes: 106 additions & 65 deletions src/backend/kernel_agents/agent_base.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
import json
import logging
import os
from typing import Any, Awaitable, Callable, Dict, List, Mapping, Optional, Union
from abc import ABC, abstractmethod
from typing import (Any, Awaitable, Callable, Dict, List, Mapping, Optional,
Union)

import semantic_kernel as sk
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
from semantic_kernel.functions import KernelFunction
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.functions.kernel_function_decorator import kernel_function
from semantic_kernel.agents import AzureAIAgentThread


# Import the new AppConfig instance
from app_config import config
from context.cosmos_memory_kernel import CosmosMemoryContext
from event_utils import track_event_if_configured
from models.messages_kernel import (
ActionRequest,
ActionResponse,
AgentMessage,
Step,
StepStatus,
)
from models.messages_kernel import (ActionRequest, ActionResponse,
AgentMessage, Step, StepStatus)
from semantic_kernel.agents import AzureAIAgentThread # pylint:disable=E0611
from semantic_kernel.agents.azure_ai.azure_ai_agent import AzureAIAgent
from semantic_kernel.functions import KernelFunction
from semantic_kernel.functions.kernel_arguments import KernelArguments
from semantic_kernel.functions.kernel_function_decorator import kernel_function

# Default formatting instructions used across agents
DEFAULT_FORMATTING_INSTRUCTIONS = "Instructions: returning the output of this function call verbatim to the user in markdown. Then write AGENT SUMMARY: and then include a summary of what you did."
Expand All @@ -40,6 +35,7 @@ def __init__(
system_message: Optional[str] = None,
client=None,
definition=None,
thread: AzureAIAgentThread=None,
):
"""Initialize the base agent.

Expand All @@ -65,6 +61,7 @@ def __init__(
endpoint=None, # Set as needed
api_version=None, # Set as needed
token=None, # Set as needed
model=config.AZURE_OPENAI_DEPLOYMENT_NAME,
agent_name=agent_name,
system_prompt=system_message,
client=client,
Expand All @@ -79,42 +76,17 @@ def __init__(
self._tools = tools
self._system_message = system_message
self._chat_history = [{"role": "system", "content": self._system_message}]
self._agent = None # Will be initialized in async_init
self._message = None # Will be initialized in handle_action_request
self._thread = thread

# Required properties for AgentGroupChat compatibility
self.name = agent_name # This is crucial for AgentGroupChat to identify agents

# @property
# def plugins(self) -> Optional[dict[str, Callable]]:
# """Get the plugins for this agent.

# Returns:
# A list of plugins, or None if not applicable.
# """
# return None
@staticmethod
def default_system_message(agent_name=None) -> str:
name = agent_name
return f"You are an AI assistant named {name}. Help the user by providing accurate and helpful information."

async def async_init(self):
"""Asynchronously initialize the agent after construction.

This method must be called after creating the agent to complete initialization.
"""
logging.info(f"Initializing agent: {self._agent_name}")
# Create Azure AI Agent or fallback
if not self._agent:
self._agent = await config.create_azure_ai_agent(
agent_name=self._agent_name,
instructions=self._system_message,
tools=self._tools,
)
else:
logging.info(f"Agent {self._agent_name} already initialized.")
# Tools are registered with the kernel via get_tools_from_config
return self

async def handle_action_request(self, action_request: ActionRequest) -> str:
"""Handle an action request from another agent or the system.

Expand All @@ -137,32 +109,17 @@ async def handle_action_request(self, action_request: ActionRequest) -> str:
status=StepStatus.failed,
message="Step not found in memory.",
)
return response.json()
return response.model_dump_json()

# Add messages to chat history for context
# This gives the agent visibility of the conversation history
self._chat_history.extend(
[
{"role": "assistant", "content": action_request.action},
{
"role": "user",
"content": f"{step.human_feedback}. Now make the function call",
},
]
)
self._message = [
action_request.action
]

try:
# Use the agent to process the action
# chat_history = self._chat_history.copy()

# Call the agent to handle the action
thread = None
# thread = self.client.agents.get_thread(
# thread=step.session_id
# ) # AzureAIAgentThread(thread_id=step.session_id)
async_generator = self._agent.invoke(
messages=f"{str(self._chat_history)}\n\nPlease perform this action",
thread=thread,
# Use the agent to process the action request
async_generator = self.invoke(
messages=self._message,
thread=self._thread,
)

response_content = ""
Expand All @@ -172,6 +129,10 @@ async def handle_action_request(self, action_request: ActionRequest) -> str:
if chunk is not None:
response_content += str(chunk)

# Log the messages in the thread
# async for msg in self._thread.get_messages():
# logging.info(f"thread messages - role:{msg.role} - content:{msg.content}")

logging.info(f"Response content length: {len(response_content)}")
logging.info(f"Response content: {response_content}")

Expand Down Expand Up @@ -264,3 +225,83 @@ def save_state(self) -> Mapping[str, Any]:
def load_state(self, state: Mapping[str, Any]) -> None:
"""Load the state of this agent."""
self._memory_store.load_state(state["memory"])

@classmethod
@abstractmethod
async def create(cls, **kwargs) -> "BaseAgent":
"""Create an instance of the agent."""
pass

@staticmethod
async def _create_azure_ai_agent_definition(
agent_name: str,
instructions: str,
tools: Optional[List[KernelFunction]] = None,
client=None,
response_format=None,
temperature: float = 0.0,
):
"""
Creates a new Azure AI Agent with the specified name and instructions using AIProjectClient.
If an agent with the given name (assistant_id) already exists, it tries to retrieve it first.

Args:
kernel: The Semantic Kernel instance
agent_name: The name of the agent (will be used as assistant_id)
instructions: The system message / instructions for the agent
agent_type: The type of agent (defaults to "assistant")
tools: Optional tool definitions for the agent
tool_resources: Optional tool resources required by the tools
response_format: Optional response format to control structured output
temperature: The temperature setting for the agent (defaults to 0.0)

Returns:
A new AzureAIAgent definition or an existing one if found
"""
try:
# Get the AIProjectClient
if client is None:
client = config.get_ai_project_client()

# # First try to get an existing agent with this name as assistant_id
try:
agent_id = None
agent_list = await client.agents.list_agents()
for agent in agent_list.data:
if agent.name == agent_name:
agent_id = agent.id
break
# If the agent already exists, we can use it directly
# Get the existing agent definition
if agent_id is not None:
logging.info(f"Agent with ID {agent_id} exists.")

existing_definition = await client.agents.get_agent(agent_id)

return existing_definition
except Exception as e:
# The Azure AI Projects SDK throws an exception when the agent doesn't exist
# (not returning None), so we catch it and proceed to create a new agent
if "ResourceNotFound" in str(e) or "404" in str(e):
logging.info(
f"Agent with ID {agent_name} not found. Will create a new one."
)
else:
# Log unexpected errors but still try to create a new agent
logging.warning(
f"Unexpected error while retrieving agent {agent_name}: {str(e)}. Attempting to create new agent."
)

# Create the agent using the project client with the agent_name as both name and assistantId
agent_definition = await client.agents.create_agent(
model=config.AZURE_OPENAI_DEPLOYMENT_NAME,
name=agent_name,
instructions=instructions,
temperature=temperature,
response_format=response_format,
)

return agent_definition
except Exception as exc:
logging.error("Failed to create Azure AI Agent: %s", exc)
raise
Loading
Loading