Skip to content

Feature/fix agent status reporting #28

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion src/chorus/agents/meta.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

from typing import Any, Tuple
from chorus.config.registrable import Registrable
from chorus.util.agent_naming import get_unique_agent_name
import uuid
Expand Down Expand Up @@ -127,4 +128,13 @@ def get_agent_class_uuid(self) -> str:
hash_obj = hashlib.md5(agent_str.encode())
self._agent_uuid = str(uuid.UUID(hex=hash_obj.hexdigest()))
return self._agent_uuid



def get_init_args(self) -> Tuple[Any, ...]:
"""
Get the initialization arguments for the agent.

Returns:
A tuple containing the initialization arguments.
"""
return self._init_args, self._init_kwargs
2 changes: 1 addition & 1 deletion src/chorus/agents/passive_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def iterate(
state.processed_messages.add(new_incoming_msg.message_id)
context.report_status(context.agent_id, AgentStatus.BUSY)
downstream_state = self.respond(context, state, new_incoming_msg)
context.report_status(context.agent_id, AgentStatus.AVAILABLE)
context.report_status(context.agent_id, AgentStatus.IDLE)
return downstream_state
else:
return state
113 changes: 109 additions & 4 deletions src/chorus/communication/message_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
import uuid
import zmq
import random
from typing import Dict, List, Optional, Set, ClassVar
from typing import Dict, List, Optional, Set, ClassVar, Tuple

from pydantic import BaseModel, Field

from chorus.data.dialog import Message
from chorus.communication.zmq_protocol import MessageType, ZMQMessage
from chorus.data.agent_status import AgentStatus, AgentStatusRecord

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,6 +64,8 @@ def __init__(self, port: int = DEFAULT_ROUTER_PORT, max_retry: int = 5):
self._thread = None
self._agent_identities = {}
self._global_message_ids = set()
self._agent_state_map = {} # Store the latest state from each agent
self._agent_status_map = {} # Store the latest status from each agent (now using AgentStatusRecord)

def start(self):
"""Start the message router in a background thread."""
Expand Down Expand Up @@ -145,6 +148,9 @@ def _process_message(self, identity: bytes, message_data: bytes):
elif msg_type == MessageType.STATE_UPDATE:
# Agent is reporting its state
self._handle_state_update(identity, zmq_message)
elif msg_type == MessageType.STATUS_UPDATE:
# Agent is reporting its status
self._handle_status_update(identity, zmq_message)
elif msg_type == MessageType.HEARTBEAT:
# Heartbeat response
self._send_to_agent(identity, ZMQMessage(
Expand Down Expand Up @@ -241,9 +247,52 @@ def _handle_state_update(self, identity: bytes, zmq_message: ZMQMessage):
identity: ZMQ identity of the agent
zmq_message: Message containing the agent's state
"""
# This is handled at the router level (Chorus class)
# The router needs to update the agent's state in its RunnerState
pass
agent_id = zmq_message.agent_id
payload = zmq_message.payload

if "state" not in payload:
logger.error(f"State update from agent {agent_id} missing state data")
return

# Store the agent's state in the state map
self._agent_state_map[agent_id] = payload["state"]
logger.debug(f"Updated state for agent {agent_id}")

# This is also handled at the runner level (Chorus class)
# The parent context can access this state map

def _handle_status_update(self, identity: bytes, zmq_message: ZMQMessage):
"""Handle agent status update message.

Args:
identity: ZMQ identity of the agent
zmq_message: Message containing the agent's status
"""
agent_id = zmq_message.agent_id
payload = zmq_message.payload

if "status" not in payload:
logger.error(f"Status update from agent {agent_id} missing status data")
return

# Store the agent's status in the status map using AgentStatusRecord
try:
status_str = payload["status"]
status = AgentStatus(status_str) # Convert string to enum
current_timestamp = int(time.time())
Copy link
Preview

Copilot AI Apr 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function uses time.time() but there is no 'import time' statement in this file. Please add 'import time' at the top of the file to ensure proper execution.

Copilot is powered by AI, so mistakes are possible. Review output carefully before use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

import time is Line 4


# Create a new AgentStatusRecord with current timestamp
status_record = AgentStatusRecord(
status=status,
last_active_timestamp=current_timestamp
)

# Store the record
self._agent_status_map[agent_id] = status_record

logger.debug(f"Updated status for agent {agent_id} to {status} at {current_timestamp}")
except ValueError as e:
logger.error(f"Invalid status value for agent {agent_id}: {payload['status']} - {e}")

def _send_to_agent(self, identity: bytes, zmq_message: ZMQMessage):
"""Send a ZMQ message to an agent.
Expand Down Expand Up @@ -363,6 +412,46 @@ def _is_agent_in_channel(self, agent_id: str, channel: str) -> bool:
# This is a stub - actual implementation needs channel info from global context
return True # Default to True for simple routing

def get_agent_state_map(self) -> Dict:
"""Get the current state map of all agents.

Returns:
Dictionary mapping agent IDs to their latest state
"""
return self._agent_state_map.copy()

def get_agent_state(self, agent_id: str) -> Optional[Dict]:
"""Get the current state of a specific agent.

Args:
agent_id: ID of the agent whose state to retrieve

Returns:
The agent's state if found, None otherwise
"""
return self._agent_state_map.get(agent_id, None)

def get_agent_status(self, agent_id: str) -> Optional[AgentStatus]:
"""Get the current status of a specific agent.

Args:
agent_id: ID of the agent whose status to retrieve

Returns:
The agent's status if found, None otherwise
"""
if agent_id in self._agent_status_map:
return self._agent_status_map[agent_id].status
return None

def get_agent_status_map(self) -> Dict[str, AgentStatusRecord]:
"""Get the current status map of all agents with timestamps.

Returns:
Dictionary mapping agent IDs to their latest status record with timestamp
"""
return self._agent_status_map.copy()


class ChorusMessageClient:
"""
Expand Down Expand Up @@ -662,6 +751,22 @@ def set_handler(obj):
except Exception as e:
logger.error(f"Error sending state update: {e}")

def send_status_update(self, status: str):
"""Send agent status update to the router.

Args:
status: String representation of agent status (from AgentStatus enum)
"""
try:
self._send_to_router(ZMQMessage(
msg_type=MessageType.STATUS_UPDATE,
agent_id=self.agent_id,
payload={"status": status}
))
logger.debug(f"Agent {self.agent_id} sent status update: {status}")
except Exception as e:
logger.error(f"Error sending status update: {e}")

def send_stop_ack(self):
"""Send acknowledgment of stop request to the router."""
self._send_to_router(ZMQMessage(
Expand Down
3 changes: 3 additions & 0 deletions src/chorus/communication/zmq_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ class MessageType(str, Enum):
# Team management
TEAM_INFO = "team_info"

# Status tracking
STATUS_UPDATE = "status_update"

# Control signals
STOP = "stop"
STOP_ACK = "stop_ack"
Expand Down
Loading
Loading