Source code for agentscope.manager._monitor

# -*- coding: utf-8 -*-
"""The manager of monitor module."""
import os
from typing import Any, Optional, List, Union
from pathlib import Path

from loguru import logger
from sqlalchemy import Column, Integer, String, create_engine, text
from sqlalchemy.ext.declarative import declarative_base, DeclarativeMeta
from sqlalchemy.orm import sessionmaker

from ._file import FileManager
from ..utils.common import _is_windows
from ..constants import (
    _DEFAULT_SQLITE_DB_NAME,
    _DEFAULT_TABLE_NAME_FOR_CHAT_AND_EMBEDDING,
    _DEFAULT_TABLE_NAME_FOR_IMAGE,
)

_Base: DeclarativeMeta = declarative_base()


class _ModelTable(_Base):  # mypy: ignore
    """The table for invocation records of chat and embedding models."""

    __tablename__ = _DEFAULT_TABLE_NAME_FOR_CHAT_AND_EMBEDDING

    id = Column(Integer, primary_key=True, autoincrement=True)
    model_name = Column(String(50))
    prompt_tokens = Column(Integer, default=0)
    completion_tokens = Column(Integer, default=0)
    total_tokens = Column(Integer, default=0)


class _ImageModelTable(_Base):
    """The table for invocation records of image models."""

    __tablename__ = _DEFAULT_TABLE_NAME_FOR_IMAGE

    id = Column(Integer, primary_key=True, autoincrement=True)
    model_name = Column(String(50))
    resolution = Column(String(59))
    image_count = Column(Integer, default=0)


[docs] class MonitorManager: """The manager of monitor module.""" _instance = None def __new__(cls, *args: Any, **kwargs: Any) -> Any: """Create a singleton instance.""" if cls._instance is None: cls._instance = super(MonitorManager, cls).__new__( cls, ) else: raise RuntimeError( "The monitor manager has been initialized. Try to use " "MonitorManager.get_instance() to get the instance.", ) return cls._instance @property def path_db(self) -> Union[str, None]: """The path to the database""" run_dir = FileManager.get_instance().run_dir if run_dir is None: return None return os.path.abspath(os.path.join(run_dir, _DEFAULT_SQLITE_DB_NAME)) def __init__(self) -> None: """Initialize the monitor manager.""" self.use_monitor = False self.session = None self.engine = None # The name of the views self.view_chat_and_embedding = "view_chat_and_embedding" self.view_image = "view_image"
[docs] def initialize(self, use_monitor: bool) -> None: """Initialize the monitor manager. Args: use_monitor (`bool`): Whether to use the monitor. """ self.use_monitor = use_monitor if use_monitor: self._create_monitor_db()
[docs] @classmethod def get_instance(cls) -> "MonitorManager": """Get the instance of the singleton class.""" if cls._instance is None: raise ValueError( "AgentScope hasn't been initialized. Please call " "`agentscope.init` function first.", ) return cls._instance
def _print_table(self, title: str, usage: List) -> None: """Print the table data.""" # TODO: use a better way to display the table data if len(usage) == 1: print_usage = usage + [["-" for _ in range(len(usage[0]))]] else: print_usage = usage max_len_col = [] for i in range(len(print_usage[0])): max_len_col.append(max(len(str(_[i])) for _ in print_usage)) logger.info(title) for row in print_usage: line = "|".join( [""] + [ str(_).center(max_len + 2, " ") for _, max_len in zip(row, max_len_col) ] + [""], ) logger.info(line) def _create_monitor_db(self) -> None: """Create the database.""" # To avoid path error in windows if self.path_db is None: raise RuntimeError( "The run_dir in file manager is not initialized.", ) path = Path(os.path.abspath(self.path_db)) if _is_windows(): self.engine = create_engine(f"sqlite:///{str(path)}") else: self.engine = create_engine(f"sqlite:////{str(path)}") # Create tables _Base.metadata.create_all(self.engine) with self.engine.connect() as connection: # Create view for chat and embedding models create_view_sql = text( f""" CREATE VIEW IF NOT EXISTS {self.view_chat_and_embedding} AS SELECT model_name, COUNT(*) AS times, SUM(prompt_tokens) AS prompt_tokens, SUM(completion_tokens) AS completion_tokens, SUM(total_tokens) AS total_tokens FROM {_ModelTable.__tablename__} GROUP BY model_name; """, ) connection.execute(create_view_sql) # Create new for text-to-image models create_view_sql = text( f""" CREATE VIEW IF NOT EXISTS {self.view_image} AS SELECT model_name, resolution, COUNT(*) AS times, SUM(image_count) AS image_count FROM {_ImageModelTable.__tablename__} GROUP BY model_name, resolution; """, ) connection.execute(create_view_sql) self.session = sessionmaker(bind=self.engine) def _close_monitor_db(self) -> None: """Close the monitor database to avoid file occupation error in windows.""" if self.session is not None: self.session.close_all() if self.engine is not None: self.engine.dispose()
[docs] def update_image_tokens( self, model_name: str, resolution: str, image_count: int, ) -> None: """Update the record of the image models.""" if not self.use_monitor: return if self.session is None: raise RuntimeError("The DB session in monitor is not initialized.") with self.session() as sess: new_record = _ImageModelTable( model_name=model_name, resolution=resolution, image_count=image_count, ) sess.add(new_record) sess.commit()
[docs] def update_text_and_embedding_tokens( self, model_name: str, prompt_tokens: int = 0, completion_tokens: int = 0, total_tokens: Optional[int] = None, ) -> None: """Update the tokens of a given model.""" if not self.use_monitor: return if self.session is None: raise RuntimeError("The DB session in monitor is not initialized.") if total_tokens is not None: assert total_tokens == prompt_tokens + completion_tokens with self.session() as sess: new_record = _ModelTable( model_name=model_name, prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, total_tokens=total_tokens or (prompt_tokens + completion_tokens), ) sess.add(new_record) sess.commit()
[docs] def print_llm_usage(self) -> dict: """Print the usage of all different model APIs.""" text_and_embedding = self.show_text_and_embedding_tokens() image = self.show_image_tokens() return { "text_and_embedding": text_and_embedding, "image": image, }
[docs] def show_image_tokens(self) -> List[dict]: """Show the tokens of all image models.""" usage = [] if self.use_monitor: with self.engine.connect() as connection: usage = connection.execute( text(f"SELECT * FROM {self.view_image}"), ).fetchall() headers = [ "MODEL NAME", "RESOLUTION", "TIMES", "IMAGE COUNT", ] usage.insert(0, headers) self._print_table("Image Model:", usage) return [ { "model_name": _[0], "resolution": _[1], "times": _[2], "image_count": _[3], } for _ in usage[1:] ]
[docs] def show_text_and_embedding_tokens(self) -> List[dict]: """Show the tokens of all models.""" usage = [] if self.use_monitor: with self.engine.connect() as connection: usage = connection.execute( text(f"SELECT * FROM {self.view_chat_and_embedding}"), ).fetchall() headers = [ "MODEL NAME", "TIMES", "PROMPT TOKENS", "COMPLETION TOKENS", "TOTAL TOKENS", ] usage.insert(0, headers) self._print_table("Text & Embedding Model:", usage) return [ { "model_name": _[0], "times": _[1], "prompt_tokens": _[2], "completion_tokens": _[3], "total_tokens": _[4], } for _ in usage[1:] ]
[docs] def rm_database(self) -> None: """Remove the database.""" if self.path_db is not None and os.path.exists(self.path_db): os.remove(self.path_db)
[docs] def state_dict(self) -> dict: """Serialize the monitor manager into a dict.""" return { "use_monitor": self.use_monitor, "path_db": self.path_db, }
[docs] def load_dict(self, data: dict) -> None: """Load the monitor manager from a dict.""" assert "use_monitor" in data, "Key 'use_monitor' not found in data." self.initialize(data["use_monitor"])
[docs] def flush(self) -> None: """Flush the monitor manager.""" # Close the database before flushing self._close_monitor_db() self.use_monitor = False self.session = None self.engine = None # The name of the views self.view_chat_and_embedding = "view_chat_and_embedding" self.view_image = "view_image"