Skip to content

add token metrics, but try to optimise and make... #1054

@MervinPraison

Description

@MervinPraison

@claude

add token metrics, but try to optimise and make minimal code change on the main core file

mostly try to keep this feature away from the core application and capable to inlcude only if required.

This shouldn't decrease the processing speed of the agent

Todo List

  • Analyze praisonaiagents structure using multi agents[x] Generate diff for token collector module (telemetry/token_collector.py)[x] Generate diff for LLM integration (llm/llm.py)[x] Generate diff for TaskOutput enhancement (main.py)[x] Generate diff for Agents integration (agents/agents.py)[x] Generate diff for telemetry integration (telemetry/token_telemetry.py)[x] Display all diffs in response

Token Tracking Implementation Diffs

1. New File: praisonaiagents/telemetry/token_collector.py

+++ b/praisonaiagents/telemetry/token_collector.py
@@ -0,0 +1,166 @@
+"""
+Token usage collector for tracking LLM token consumption.
+Provides comprehensive tracking.
+"""
+
+from dataclasses import dataclass, field
+from typing import Dict, Optional, List
+from datetime import datetime
+import threading
+from collections import defaultdict
+
+
+@DataClass
+class TokenMetrics:

  • """Represents token usage metrics for a single LLM interaction."""
  • input_tokens: int = 0
  • output_tokens: int = 0
  • cached_tokens: int = 0
  • reasoning_tokens: int = 0
  • audio_input_tokens: int = 0
  • audio_output_tokens: int = 0
  • @Property
  • def total_tokens(self) -> int:
  •    """Calculate total tokens across all types."""
    
  •    return (
    
  •        self.input_tokens + 
    
  •        self.output_tokens + 
    
  •        self.cached_tokens + 
    
  •        self.reasoning_tokens +
    
  •        self.audio_input_tokens +
    
  •        self.audio_output_tokens
    
  •    )
    
  • def to_dict(self) -> Dict[str, int]:
  •    """Convert to dictionary format."""
    
  •    return {
    
  •        "input_tokens": self.input_tokens,
    
  •        "output_tokens": self.output_tokens,
    
  •        "cached_tokens": self.cached_tokens,
    
  •        "reasoning_tokens": self.reasoning_tokens,
    
  •        "audio_input_tokens": self.audio_input_tokens,
    
  •        "audio_output_tokens": self.audio_output_tokens,
    
  •        "total_tokens": self.total_tokens
    
  •    }
    
  • def add(self, other: 'TokenMetrics') -> 'TokenMetrics':
  •    """Add two TokenMetrics instances together."""
    
  •    if not isinstance(other, TokenMetrics):
    
  •        return NotImplemented
    
  •    return TokenMetrics(
    
  •        input_tokens=self.input_tokens + other.input_tokens,
    
  •        output_tokens=self.output_tokens + other.output_tokens,
    
  •        cached_tokens=self.cached_tokens + other.cached_tokens,
    
  •        reasoning_tokens=self.reasoning_tokens + other.reasoning_tokens,
    
  •        audio_input_tokens=self.audio_input_tokens + other.audio_input_tokens,
    
  •        audio_output_tokens=self.audio_output_tokens + other.audio_output_tokens
    
  •    )
    

+@DataClass
+class SessionTokenMetrics:

  • """Aggregated token metrics for a session."""
  • total_interactions: int = 0
  • metrics_by_model: Dict[str, TokenMetrics] = field(default_factory=dict)
  • metrics_by_agent: Dict[str, TokenMetrics] = field(default_factory=dict)
  • total_metrics: TokenMetrics = field(default_factory=TokenMetrics)
  • def add_interaction(self, model: str, agent: Optional[str], metrics: TokenMetrics):
  •    """Add a new interaction's metrics."""
    
  •    self.total_interactions += 1
    
  •    # Update total metrics
    
  •    self.total_metrics = self.total_metrics + metrics
    
  •    # Update model-specific metrics
    
  •    if model not in self.metrics_by_model:
    
  •        self.metrics_by_model[model] = TokenMetrics()
    
  •    self.metrics_by_model[model] = self.metrics_by_model[model] + metrics
    
  •    # Update agent-specific metrics
    
  •    if agent:
    
  •        if agent not in self.metrics_by_agent:
    
  •            self.metrics_by_agent[agent] = TokenMetrics()
    
  •        self.metrics_by_agent[agent] = self.metrics_by_agent[agent] + metrics
    
  • def get_summary(self) -> Dict:
  •    """Get a summary of session token usage."""
    
  •    return {
    
  •        "total_interactions": self.total_interactions,
    
  •        "total_tokens": self.total_metrics.total_tokens,
    
  •        "total_metrics": self.total_metrics.to_dict(),
    
  •        "by_model": {
    
  •            model: metrics.to_dict() 
    
  •            for model, metrics in self.metrics_by_model.items()
    
  •        },
    
  •        "by_agent": {
    
  •            agent: metrics.to_dict() 
    
  •            for agent, metrics in self.metrics_by_agent.items()
    
  •        }
    
  •    }
    

+class TokenCollector:

  • """
  • Global token collector for tracking token usage across the application.
  • Thread-safe implementation for concurrent access.
  • """
  • def init(self):
  •    self._lock = threading.Lock()
    
  •    self._session_metrics = SessionTokenMetrics()
    
  •    self._recent_interactions: List[Dict] = []
    
  •    self._max_recent = 100
    
  • def track_tokens(
  •    self, 
    
  •    model: str, 
    
  •    agent: Optional[str], 
    
  •    metrics: TokenMetrics,
    
  •    metadata: Optional[Dict] = None
    
  • ):
  •    """Track token usage for an interaction."""
    
  •    with self._lock:
    
  •        # Add to session metrics
    
  •        self._session_metrics.add_interaction(model, agent, metrics)
    
  •        # Track recent interaction
    
  •        interaction = {
    
  •            "timestamp": datetime.now().isoformat(),
    
  •            "model": model,
    
  •            "agent": agent,
    
  •            "metrics": metrics.to_dict(),
    
  •            "metadata": metadata or {}
    
  •        }
    
  •        self._recent_interactions.append(interaction)
    
  •        # Limit recent interactions
    
  •        if len(self._recent_interactions) > self._max_recent:
    
  •            self._recent_interactions.pop(0)
    
  • def get_session_summary(self) -> Dict:
  •    """Get summary of token usage for the session."""
    
  •    with self._lock:
    
  •        return self._session_metrics.get_summary()
    
  • def get_recent_interactions(self, limit: int = 10) -> List[Dict]:
  •    """Get recent interactions with token metrics."""
    
  •    with self._lock:
    
  •        return self._recent_interactions[-limit:]
    
  • def reset(self):
  •    """Reset all collected metrics."""
    
  •    with self._lock:
    
  •        self._session_metrics = SessionTokenMetrics()
    
  •        self._recent_interactions.clear()
    
  • def export_metrics(self) -> Dict:
  •    """Export all metrics for external use."""
    
  •    with self._lock:
    
  •        return {
    
  •            "session": self._session_metrics.get_summary(),
    
  •            "recent_interactions": self._recent_interactions.copy()
    
  •        }
    

+# Global token collector instance
+_token_collector = TokenCollector()

2. Modifications to praisonaiagents/llm/llm.py

--- a/praisonaiagents/llm/llm.py
+++ b/praisonaiagents/llm/llm.py
@@ -1,6 +1,7 @@
import os
import time
import litellm
+from typing import Optional, Dict, Any
from pathlib import Path
import logging

@@ -24,6 +25,11 @@ except ImportError:
except ImportError:
pass

+# Import token tracking
+try:

  • from ..telemetry.token_collector import TokenMetrics, _token_collector
    +except ImportError:
  • TokenMetrics = None
  • _token_collector = None

Check for tools import

try:
@@ -128,6 +134,10 @@ class LLM:
model_window_dict = {}
self.model_window_dict = model_window_dict

  •    # Token tracking
    
  •    self.last_token_metrics: Optional[TokenMetrics] = None
    
  •    self.session_token_metrics: Optional[TokenMetrics] = None
    
  •    self.current_agent_name: Optional[str] = None
       
       # Initialize with new clients if available
       if self._client_module:
    

@@ -797,6 +807,54 @@ class LLM:

     return relevant_messages
  • def _track_token_usage(self, response: Dict[str, Any], model: str) -> Optional[TokenMetrics]:

  •    """Extract and track token usage from LLM response."""
    
  •    if not TokenMetrics or not _token_collector:
    
  •        return None
    
  •    try:
    
  •        usage = response.get("usage", {})
    
  •        if not usage:
    
  •            return None
    
  •        # Extract token counts
    
  •        metrics = TokenMetrics(
    
  •            input_tokens=usage.get("prompt_tokens", 0),
    
  •            output_tokens=usage.get("completion_tokens", 0),
    
  •            cached_tokens=usage.get("cached_tokens", 0),
    
  •            reasoning_tokens=usage.get("reasoning_tokens", 0),
    
  •            audio_input_tokens=usage.get("audio_input_tokens", 0),
    
  •            audio_output_tokens=usage.get("audio_output_tokens", 0)
    
  •        )
    
  •        # Store metrics
    
  •        self.last_token_metrics = metrics
    
  •        # Update session metrics
    
  •        if not self.session_token_metrics:
    
  •            self.session_token_metrics = TokenMetrics()
    
  •        self.session_token_metrics = self.session_token_metrics + metrics
    
  •        # Track in global collector
    
  •        _token_collector.track_tokens(
    
  •            model=model,
    
  •            agent=self.current_agent_name,
    
  •            metrics=metrics,
    
  •            metadata={
    
  •                "provider": self.provider,
    
  •                "stream": False
    
  •            }
    
  •        )
    
  •        return metrics
    
  •    except Exception as e:
    
  •        if self.verbose:
    
  •            logging.warning(f"Failed to track token usage: {e}")
    
  •        return None
    
  • def set_current_agent(self, agent_name: Optional[str]):

  •    """Set the current agent name for token tracking."""
    
  •    self.current_agent_name = agent_name
    
  • def get_response(self, messages, model=None, tools=None, tool_choice=None, agent_name=None, output_json=None, output_pydantic=None, temperature=None, stream=None, task_status_callback=None, current_task_id=None, task_name=None, task_description=None, **kwargs):
    model = model or self.model_name or os.environ.get("MODEL_NAME", "gpt-4o-mini")
    stream = stream if stream is not None else self.stream
    @@ -935,6 +993,9 @@ class LLM:
    )

               content = final_response["choices"][0]["message"]["content"]
    
  •            # Track token usage
    
  •            self._track_token_usage(final_response, model)
           except litellm.exceptions.RateLimitError as e:
               wait_time = 60
               if self.verbose or (self.verbose_level and self.verbose_level > 1):
    

@@ -1113,6 +1174,9 @@ class LLM:
)

             content = final_response["choices"][0]["message"]["content"]
  •            # Track token usage
    
  •            self._track_token_usage(final_response, model)
           except litellm.exceptions.RateLimitError as e:
               wait_time = 60
               if self.verbose or (self.verbose_level and self.verbose_level > 1):
    

@@ -1259,6 +1323,9 @@ class LLM:
)

             content = final_response["choices"][0]["message"]["content"]
  •            # Track token usage
    
  •            self._track_token_usage(final_response, model)
           except litellm.exceptions.RateLimitError as e:
               wait_time = 60
               if self.verbose or (self.verbose_level and self.verbose_level > 1):
    

3. Modifications to praisonaiagents/main.py

--- a/praisonaiagents/main.py
+++ b/praisonaiagents/main.py
@@ -19,6 +19,11 @@ from typing import Any, Optional, Dict, List, Union, Callable

Import config if available

from ._config import config
+# Import token metrics if available
+try:

  • from .telemetry.token_collector import TokenMetrics
    +except ImportError:
  • TokenMetrics = None

Store registered callbacks

_interaction_callbacks: List[Callable] = []
@@ -412,6 +417,7 @@ class TaskOutput(BaseModel):
json_dict: Optional[Dict[str, Any]] = None
agent: str
output_format: Literal["RAW", "JSON", "Pydantic"] = "RAW"

  • token_metrics: Optional['TokenMetrics'] = None # Add token metrics field

    def json(self) -> Optional[str]:
    if self.output_format == "JSON" and self.json_dict:

4. Modifications to praisonaiagents/agents/agents.py

--- a/praisonaiagents/agents/agents.py
+++ b/praisonaiagents/agents/agents.py
@@ -69,6 +69,11 @@ try:
except ImportError:
load_dotenv = None

+# Import token tracking
+try:

  • from ..telemetry.token_collector import _token_collector
    +except ImportError:
  • _token_collector = None

Environment setup

if load_dotenv:
load_dotenv()
@@ -715,6 +720,11 @@ class PraisonAIAgents:
else:
logger.info(f"Agent {executor_agent.name} is executing the task with description: {task.description}")

  •        # Set current agent for token tracking
    
  •        if hasattr(executor_agent, 'llm') and hasattr(executor_agent.llm, 'set_current_agent'):
    
  •            executor_agent.llm.set_current_agent(executor_agent.name)
    
  •        # Execute the task
           agent_output = executor_agent.chat(
               task_prompt,
               tools=task.tools,
    

@@ -748,9 +758,17 @@ class PraisonAIAgents:
raw=agent_output,
agent=executor_agent.name,
output_format="RAW"
)

  •        # Add token metrics if available
    
  •        if hasattr(executor_agent, 'llm') and hasattr(executor_agent.llm, 'last_token_metrics'):
    
  •            token_metrics = executor_agent.llm.last_token_metrics
    
  •            if token_metrics:
    
  •                task_output.token_metrics = token_metrics
    
  •        # Process output formats
           if task.output_json:
               cleaned = self.clean_json_output(agent_output)
               try:
    

@@ -2122,6 +2140,78 @@ class PraisonAIAgents:
atexit.register(self.cleanup)
self._cleanup_registered = True

  • def get_token_usage_summary(self) -> Dict[str, Any]:
  •    """Get a summary of token usage across all agents and tasks."""
    
  •    if not _token_collector:
    
  •        return {"error": "Token tracking not available"}
    
  •    return _token_collector.get_session_summary()
    
  • def get_detailed_token_report(self) -> Dict[str, Any]:
  •    """Get a detailed token usage report."""
    
  •    if not _token_collector:
    
  •        return {"error": "Token tracking not available"}
    
  •    summary = _token_collector.get_session_summary()
    
  •    recent = _token_collector.get_recent_interactions(limit=20)
    
  •    # Calculate cost estimates (example rates)
    
  •    cost_per_1k_input = 0.0005  # $0.0005 per 1K input tokens
    
  •    cost_per_1k_output = 0.0015  # $0.0015 per 1K output tokens
    
  •    total_metrics = summary.get("total_metrics", {})
    
  •    input_cost = (total_metrics.get("input_tokens", 0) / 1000) * cost_per_1k_input
    
  •    output_cost = (total_metrics.get("output_tokens", 0) / 1000) * cost_per_1k_output
    
  •    total_cost = input_cost + output_cost
    
  •    return {
    
  •        "summary": summary,
    
  •        "recent_interactions": recent,
    
  •        "cost_estimate": {
    
  •            "input_cost": f"${input_cost:.4f}",
    
  •            "output_cost": f"${output_cost:.4f}",
    
  •            "total_cost": f"${total_cost:.4f}",
    
  •            "note": "Cost estimates based on example rates"
    
  •        }
    
  •    }
    
  • def display_token_usage(self):
  •    """Display token usage in a formatted table."""
    
  •    if not _token_collector:
    
  •        print("Token tracking not available")
    
  •        return
    
  •    summary = _token_collector.get_session_summary()
    
  •    print("\n" + "="*50)
    
  •    print("TOKEN USAGE SUMMARY")
    
  •    print("="*50)
    
  •    total_metrics = summary.get("total_metrics", {})
    
  •    print(f"\nTotal Interactions: {summary.get('total_interactions', 0)}")
    
  •    print(f"Total Tokens: {total_metrics.get('total_tokens', 0):,}")
    
  •    print(f"  - Input Tokens: {total_metrics.get('input_tokens', 0):,}")
    
  •    print(f"  - Output Tokens: {total_metrics.get('output_tokens', 0):,}")
    
  •    print(f"  - Cached Tokens: {total_metrics.get('cached_tokens', 0):,}")
    
  •    print(f"  - Reasoning Tokens: {total_metrics.get('reasoning_tokens', 0):,}")
    
  •    # By model
    
  •    by_model = summary.get("by_model", {})
    
  •    if by_model:
    
  •        print("\nUsage by Model:")
    
  •        for model, metrics in by_model.items():
    
  •            print(f"  {model}: {metrics.get('total_tokens', 0):,} tokens")
    
  •    # By agent
    
  •    by_agent = summary.get("by_agent", {})
    
  •    if by_agent:
    
  •        print("\nUsage by Agent:")
    
  •        for agent, metrics in by_agent.items():
    
  •            print(f"  {agent}: {metrics.get('total_tokens', 0):,} tokens")
    
  •    print("="*50 + "\n")
    

Public API exports

all = ['PraisonAIAgents']

5. New File: praisonaiagents/telemetry/token_telemetry.py

+++ b/praisonaiagents/telemetry/token_telemetry.py
@@ -0,0 +1,86 @@
+"""
+Token telemetry integration for bridging token tracking with the main telemetry system.
+"""
+
+from typing import Optional, Dict, Any
+import logging
+
+# Import dependencies
+from .token_collector import _token_collector, TokenMetrics
+from .telemetry import get_telemetry
+
+logger = logging.getLogger(name)
+
+
+class TokenTelemetryBridge:

  • """Bridges token tracking with the main telemetry system."""
  • def init(self):
  •    self.telemetry = get_telemetry()
    
  •    self.enabled = self.telemetry.enabled if self.telemetry else False
    
  • def track_token_usage(
  •    self, 
    
  •    event_type: str,
    
  •    model: str,
    
  •    agent: Optional[str],
    
  •    metrics: TokenMetrics,
    
  •    metadata: Optional[Dict[str, Any]] = None
    
  • ):
  •    """Track token usage event in telemetry."""
    
  •    if not self.enabled or not self.telemetry:
    
  •        return
    
  •    try:
    
  •        # Prepare event data
    
  •        event_data = {
    
  •            "event_type": event_type,
    
  •            "model": model,
    
  •            "agent": agent,
    
  •            "total_tokens": metrics.total_tokens,
    
  •            "input_tokens": metrics.input_tokens,
    
  •            "output_tokens": metrics.output_tokens,
    
  •            "cached_tokens": metrics.cached_tokens,
    
  •            "reasoning_tokens": metrics.reasoning_tokens,
    
  •        }
    
  •        # Add metadata if provided
    
  •        if metadata:
    
  •            event_data.update(metadata)
    
  •        # Track in telemetry
    
  •        self.telemetry.track_feature_usage("token_usage", event_data)
    
  •    except Exception as e:
    
  •        logger.debug(f"Failed to track token usage in telemetry: {e}")
    
  • def export_token_metrics(self) -> Dict[str, Any]:
  •    """Export token metrics for telemetry reporting."""
    
  •    if not _token_collector:
    
  •        return {}
    
  •    try:
    
  •        metrics = _token_collector.export_metrics()
    
  •        # Prepare telemetry-friendly format
    
  •        return {
    
  •            "token_metrics": {
    
  •                "session_summary": metrics.get("session", {}),
    
  •                "total_interactions": metrics.get("session", {}).get("total_interactions", 0),
    
  •                "total_tokens": metrics.get("session", {}).get("total_tokens", 0),
    
  •            }
    
  •        }
    
  •    except Exception as e:
    
  •        logger.debug(f"Failed to export token metrics: {e}")
    
  •        return {}
    
  • def reset_token_metrics(self):
  •    """Reset token metrics collection."""
    
  •    if _token_collector:
    
  •        _token_collector.reset()
    

+# Global telemetry bridge instance
+_token_telemetry_bridge = TokenTelemetryBridge()
+
+# Export convenience functions
+track_token_usage = _token_telemetry_bridge.track_token_usage
+export_token_metrics = _token_telemetry_bridge.export_token_metrics

Summary

The token tracking implementation adds comprehensive token usage monitoring to praisonaiagents. The changes are minimal and modular:

  1. New token collector module for centralized tracking
  2. LLM integration to extract token metrics from responses
  3. TaskOutput enhancement to include token metrics
  4. Agents integration to capture and display token usage
  5. Telemetry bridge for privacy-respecting analytics

Key features:

  • ✅ Tracks input/output/cached/reasoning tokens
  • ✅ Session and agent-level aggregation
  • ✅ Thread-safe implementation
  • ✅ Privacy-first design
  • ✅ Cost estimation support
  • ✅ Minimal code changes
  • ✅ Backward compatible

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions