-
-
Notifications
You must be signed in to change notification settings - Fork 754
Description
add token metrics, but try to optimise and make minimal code change on the main core file
mostly try to keep this feature away from the core application and capable to inlcude only if required.
This shouldn't decrease the processing speed of the agent
Todo List
- Analyze praisonaiagents structure using multi agents[x] Generate diff for token collector module (telemetry/token_collector.py)[x] Generate diff for LLM integration (llm/llm.py)[x] Generate diff for TaskOutput enhancement (main.py)[x] Generate diff for Agents integration (agents/agents.py)[x] Generate diff for telemetry integration (telemetry/token_telemetry.py)[x] Display all diffs in response
Token Tracking Implementation Diffs
1. New File:
praisonaiagents/telemetry/token_collector.py+++ b/praisonaiagents/telemetry/token_collector.py
@@ -0,0 +1,166 @@
+"""
+Token usage collector for tracking LLM token consumption.
+Provides comprehensive tracking.
+"""
+
+from dataclasses import dataclass, field
+from typing import Dict, Optional, List
+from datetime import datetime
+import threading
+from collections import defaultdict
+
+
+@DataClass
+class TokenMetrics:
- """Represents token usage metrics for a single LLM interaction."""
- input_tokens: int = 0
- output_tokens: int = 0
- cached_tokens: int = 0
- reasoning_tokens: int = 0
- audio_input_tokens: int = 0
- audio_output_tokens: int = 0
- @Property
- def total_tokens(self) -> int:
"""Calculate total tokens across all types."""return (self.input_tokens +self.output_tokens +self.cached_tokens +self.reasoning_tokens +self.audio_input_tokens +self.audio_output_tokens)- def to_dict(self) -> Dict[str, int]:
"""Convert to dictionary format."""return {"input_tokens": self.input_tokens,"output_tokens": self.output_tokens,"cached_tokens": self.cached_tokens,"reasoning_tokens": self.reasoning_tokens,"audio_input_tokens": self.audio_input_tokens,"audio_output_tokens": self.audio_output_tokens,"total_tokens": self.total_tokens}- def add(self, other: 'TokenMetrics') -> 'TokenMetrics':
"""Add two TokenMetrics instances together."""if not isinstance(other, TokenMetrics):return NotImplementedreturn TokenMetrics(input_tokens=self.input_tokens + other.input_tokens,output_tokens=self.output_tokens + other.output_tokens,cached_tokens=self.cached_tokens + other.cached_tokens,reasoning_tokens=self.reasoning_tokens + other.reasoning_tokens,audio_input_tokens=self.audio_input_tokens + other.audio_input_tokens,audio_output_tokens=self.audio_output_tokens + other.audio_output_tokens)+@DataClass
+class SessionTokenMetrics:
- """Aggregated token metrics for a session."""
- total_interactions: int = 0
- metrics_by_model: Dict[str, TokenMetrics] = field(default_factory=dict)
- metrics_by_agent: Dict[str, TokenMetrics] = field(default_factory=dict)
- total_metrics: TokenMetrics = field(default_factory=TokenMetrics)
- def add_interaction(self, model: str, agent: Optional[str], metrics: TokenMetrics):
"""Add a new interaction's metrics."""self.total_interactions += 1# Update total metricsself.total_metrics = self.total_metrics + metrics# Update model-specific metricsif model not in self.metrics_by_model:self.metrics_by_model[model] = TokenMetrics()self.metrics_by_model[model] = self.metrics_by_model[model] + metrics# Update agent-specific metricsif agent:if agent not in self.metrics_by_agent:self.metrics_by_agent[agent] = TokenMetrics()self.metrics_by_agent[agent] = self.metrics_by_agent[agent] + metrics- def get_summary(self) -> Dict:
"""Get a summary of session token usage."""return {"total_interactions": self.total_interactions,"total_tokens": self.total_metrics.total_tokens,"total_metrics": self.total_metrics.to_dict(),"by_model": {model: metrics.to_dict()for model, metrics in self.metrics_by_model.items()},"by_agent": {agent: metrics.to_dict()for agent, metrics in self.metrics_by_agent.items()}}+class TokenCollector:
- """
- Global token collector for tracking token usage across the application.
- Thread-safe implementation for concurrent access.
- """
- def init(self):
self._lock = threading.Lock()self._session_metrics = SessionTokenMetrics()self._recent_interactions: List[Dict] = []self._max_recent = 100- def track_tokens(
self,model: str,agent: Optional[str],metrics: TokenMetrics,metadata: Optional[Dict] = None- ):
"""Track token usage for an interaction."""with self._lock:# Add to session metricsself._session_metrics.add_interaction(model, agent, metrics)# Track recent interactioninteraction = {"timestamp": datetime.now().isoformat(),"model": model,"agent": agent,"metrics": metrics.to_dict(),"metadata": metadata or {}}self._recent_interactions.append(interaction)# Limit recent interactionsif len(self._recent_interactions) > self._max_recent:self._recent_interactions.pop(0)- def get_session_summary(self) -> Dict:
"""Get summary of token usage for the session."""with self._lock:return self._session_metrics.get_summary()- def get_recent_interactions(self, limit: int = 10) -> List[Dict]:
"""Get recent interactions with token metrics."""with self._lock:return self._recent_interactions[-limit:]- def reset(self):
"""Reset all collected metrics."""with self._lock:self._session_metrics = SessionTokenMetrics()self._recent_interactions.clear()- def export_metrics(self) -> Dict:
"""Export all metrics for external use."""with self._lock:return {"session": self._session_metrics.get_summary(),"recent_interactions": self._recent_interactions.copy()}+# Global token collector instance
+_token_collector = TokenCollector()2. Modifications to
praisonaiagents/llm/llm.py--- a/praisonaiagents/llm/llm.py
+++ b/praisonaiagents/llm/llm.py
@@ -1,6 +1,7 @@
import os
import time
import litellm
+from typing import Optional, Dict, Any
from pathlib import Path
import logging@@ -24,6 +25,11 @@ except ImportError:
except ImportError:
pass+# Import token tracking
+try:
- from ..telemetry.token_collector import TokenMetrics, _token_collector
+except ImportError:- TokenMetrics = None
- _token_collector = None
Check for tools import
try:
@@ -128,6 +134,10 @@ class LLM:
model_window_dict = {}
self.model_window_dict = model_window_dict
# Token trackingself.last_token_metrics: Optional[TokenMetrics] = Noneself.session_token_metrics: Optional[TokenMetrics] = Noneself.current_agent_name: Optional[str] = None # Initialize with new clients if available if self._client_module:@@ -797,6 +807,54 @@ class LLM:
return relevant_messages
def _track_token_usage(self, response: Dict[str, Any], model: str) -> Optional[TokenMetrics]:
"""Extract and track token usage from LLM response."""if not TokenMetrics or not _token_collector:return Nonetry:usage = response.get("usage", {})if not usage:return None# Extract token countsmetrics = TokenMetrics(input_tokens=usage.get("prompt_tokens", 0),output_tokens=usage.get("completion_tokens", 0),cached_tokens=usage.get("cached_tokens", 0),reasoning_tokens=usage.get("reasoning_tokens", 0),audio_input_tokens=usage.get("audio_input_tokens", 0),audio_output_tokens=usage.get("audio_output_tokens", 0))# Store metricsself.last_token_metrics = metrics# Update session metricsif not self.session_token_metrics:self.session_token_metrics = TokenMetrics()self.session_token_metrics = self.session_token_metrics + metrics# Track in global collector_token_collector.track_tokens(model=model,agent=self.current_agent_name,metrics=metrics,metadata={"provider": self.provider,"stream": False})return metricsexcept Exception as e:if self.verbose:logging.warning(f"Failed to track token usage: {e}")return Nonedef set_current_agent(self, agent_name: Optional[str]):
"""Set the current agent name for token tracking."""self.current_agent_name = agent_namedef get_response(self, messages, model=None, tools=None, tool_choice=None, agent_name=None, output_json=None, output_pydantic=None, temperature=None, stream=None, task_status_callback=None, current_task_id=None, task_name=None, task_description=None, **kwargs):
model = model or self.model_name or os.environ.get("MODEL_NAME", "gpt-4o-mini")
stream = stream if stream is not None else self.stream
@@ -935,6 +993,9 @@ class LLM:
)content = final_response["choices"][0]["message"]["content"]# Track token usageself._track_token_usage(final_response, model) except litellm.exceptions.RateLimitError as e: wait_time = 60 if self.verbose or (self.verbose_level and self.verbose_level > 1):@@ -1113,6 +1174,9 @@ class LLM:
)content = final_response["choices"][0]["message"]["content"]
# Track token usageself._track_token_usage(final_response, model) except litellm.exceptions.RateLimitError as e: wait_time = 60 if self.verbose or (self.verbose_level and self.verbose_level > 1):@@ -1259,6 +1323,9 @@ class LLM:
)content = final_response["choices"][0]["message"]["content"]
# Track token usageself._track_token_usage(final_response, model) except litellm.exceptions.RateLimitError as e: wait_time = 60 if self.verbose or (self.verbose_level and self.verbose_level > 1):3. Modifications to
praisonaiagents/main.py--- a/praisonaiagents/main.py
+++ b/praisonaiagents/main.py
@@ -19,6 +19,11 @@ from typing import Any, Optional, Dict, List, Union, CallableImport config if available
from ._config import config
+# Import token metrics if available
+try:
- from .telemetry.token_collector import TokenMetrics
+except ImportError:- TokenMetrics = None
Store registered callbacks
_interaction_callbacks: List[Callable] = []
@@ -412,6 +417,7 @@ class TaskOutput(BaseModel):
json_dict: Optional[Dict[str, Any]] = None
agent: str
output_format: Literal["RAW", "JSON", "Pydantic"] = "RAW"
token_metrics: Optional['TokenMetrics'] = None # Add token metrics field
def json(self) -> Optional[str]:
if self.output_format == "JSON" and self.json_dict:4. Modifications to
praisonaiagents/agents/agents.py--- a/praisonaiagents/agents/agents.py
+++ b/praisonaiagents/agents/agents.py
@@ -69,6 +69,11 @@ try:
except ImportError:
load_dotenv = None+# Import token tracking
+try:
- from ..telemetry.token_collector import _token_collector
+except ImportError:- _token_collector = None
Environment setup
if load_dotenv:
load_dotenv()
@@ -715,6 +720,11 @@ class PraisonAIAgents:
else:
logger.info(f"Agent {executor_agent.name} is executing the task with description: {task.description}")
# Set current agent for token trackingif hasattr(executor_agent, 'llm') and hasattr(executor_agent.llm, 'set_current_agent'):executor_agent.llm.set_current_agent(executor_agent.name)# Execute the task agent_output = executor_agent.chat( task_prompt, tools=task.tools,@@ -748,9 +758,17 @@ class PraisonAIAgents:
raw=agent_output,
agent=executor_agent.name,
output_format="RAW"
)
# Add token metrics if availableif hasattr(executor_agent, 'llm') and hasattr(executor_agent.llm, 'last_token_metrics'):token_metrics = executor_agent.llm.last_token_metricsif token_metrics:task_output.token_metrics = token_metrics# Process output formats if task.output_json: cleaned = self.clean_json_output(agent_output) try:@@ -2122,6 +2140,78 @@ class PraisonAIAgents:
atexit.register(self.cleanup)
self._cleanup_registered = True
- def get_token_usage_summary(self) -> Dict[str, Any]:
"""Get a summary of token usage across all agents and tasks."""if not _token_collector:return {"error": "Token tracking not available"}return _token_collector.get_session_summary()- def get_detailed_token_report(self) -> Dict[str, Any]:
"""Get a detailed token usage report."""if not _token_collector:return {"error": "Token tracking not available"}summary = _token_collector.get_session_summary()recent = _token_collector.get_recent_interactions(limit=20)# Calculate cost estimates (example rates)cost_per_1k_input = 0.0005 # $0.0005 per 1K input tokenscost_per_1k_output = 0.0015 # $0.0015 per 1K output tokenstotal_metrics = summary.get("total_metrics", {})input_cost = (total_metrics.get("input_tokens", 0) / 1000) * cost_per_1k_inputoutput_cost = (total_metrics.get("output_tokens", 0) / 1000) * cost_per_1k_outputtotal_cost = input_cost + output_costreturn {"summary": summary,"recent_interactions": recent,"cost_estimate": {"input_cost": f"${input_cost:.4f}","output_cost": f"${output_cost:.4f}","total_cost": f"${total_cost:.4f}","note": "Cost estimates based on example rates"}}- def display_token_usage(self):
"""Display token usage in a formatted table."""if not _token_collector:print("Token tracking not available")returnsummary = _token_collector.get_session_summary()print("\n" + "="*50)print("TOKEN USAGE SUMMARY")print("="*50)total_metrics = summary.get("total_metrics", {})print(f"\nTotal Interactions: {summary.get('total_interactions', 0)}")print(f"Total Tokens: {total_metrics.get('total_tokens', 0):,}")print(f" - Input Tokens: {total_metrics.get('input_tokens', 0):,}")print(f" - Output Tokens: {total_metrics.get('output_tokens', 0):,}")print(f" - Cached Tokens: {total_metrics.get('cached_tokens', 0):,}")print(f" - Reasoning Tokens: {total_metrics.get('reasoning_tokens', 0):,}")# By modelby_model = summary.get("by_model", {})if by_model:print("\nUsage by Model:")for model, metrics in by_model.items():print(f" {model}: {metrics.get('total_tokens', 0):,} tokens")# By agentby_agent = summary.get("by_agent", {})if by_agent:print("\nUsage by Agent:")for agent, metrics in by_agent.items():print(f" {agent}: {metrics.get('total_tokens', 0):,} tokens")print("="*50 + "\n")Public API exports
all = ['PraisonAIAgents']
5. New File:
praisonaiagents/telemetry/token_telemetry.py+++ b/praisonaiagents/telemetry/token_telemetry.py
@@ -0,0 +1,86 @@
+"""
+Token telemetry integration for bridging token tracking with the main telemetry system.
+"""
+
+from typing import Optional, Dict, Any
+import logging
+
+# Import dependencies
+from .token_collector import _token_collector, TokenMetrics
+from .telemetry import get_telemetry
+
+logger = logging.getLogger(name)
+
+
+class TokenTelemetryBridge:
- """Bridges token tracking with the main telemetry system."""
- def init(self):
self.telemetry = get_telemetry()self.enabled = self.telemetry.enabled if self.telemetry else False- def track_token_usage(
self,event_type: str,model: str,agent: Optional[str],metrics: TokenMetrics,metadata: Optional[Dict[str, Any]] = None- ):
"""Track token usage event in telemetry."""if not self.enabled or not self.telemetry:returntry:# Prepare event dataevent_data = {"event_type": event_type,"model": model,"agent": agent,"total_tokens": metrics.total_tokens,"input_tokens": metrics.input_tokens,"output_tokens": metrics.output_tokens,"cached_tokens": metrics.cached_tokens,"reasoning_tokens": metrics.reasoning_tokens,}# Add metadata if providedif metadata:event_data.update(metadata)# Track in telemetryself.telemetry.track_feature_usage("token_usage", event_data)except Exception as e:logger.debug(f"Failed to track token usage in telemetry: {e}")- def export_token_metrics(self) -> Dict[str, Any]:
"""Export token metrics for telemetry reporting."""if not _token_collector:return {}try:metrics = _token_collector.export_metrics()# Prepare telemetry-friendly formatreturn {"token_metrics": {"session_summary": metrics.get("session", {}),"total_interactions": metrics.get("session", {}).get("total_interactions", 0),"total_tokens": metrics.get("session", {}).get("total_tokens", 0),}}except Exception as e:logger.debug(f"Failed to export token metrics: {e}")return {}- def reset_token_metrics(self):
"""Reset token metrics collection."""if _token_collector:_token_collector.reset()+# Global telemetry bridge instance
+_token_telemetry_bridge = TokenTelemetryBridge()
+
+# Export convenience functions
+track_token_usage = _token_telemetry_bridge.track_token_usage
+export_token_metrics = _token_telemetry_bridge.export_token_metricsSummary
The token tracking implementation adds comprehensive token usage monitoring to praisonaiagents. The changes are minimal and modular:
- New token collector module for centralized tracking
- LLM integration to extract token metrics from responses
- TaskOutput enhancement to include token metrics
- Agents integration to capture and display token usage
- Telemetry bridge for privacy-respecting analytics
Key features:
- ✅ Tracks input/output/cached/reasoning tokens
- ✅ Session and agent-level aggregation
- ✅ Thread-safe implementation
- ✅ Privacy-first design
- ✅ Cost estimation support
- ✅ Minimal code changes
- ✅ Backward compatible