Source code for agentscope.web.gradio.utils

# -*- coding: utf-8 -*-
"""web ui utils"""
import os
import threading
from typing import Optional
import hashlib
from multiprocessing import Queue
from queue import Empty
from collections import defaultdict

from PIL import Image

SYS_MSG_PREFIX = "【SYSTEM】"

thread_local_data = threading.local()


[docs] def init_uid_queues() -> dict: """Initializes and returns a dictionary of user-specific queues.""" return { "glb_queue_chat_msg": Queue(), "glb_queue_user_input": Queue(), "glb_queue_reset_msg": Queue(), }
glb_uid_dict = defaultdict(init_uid_queues)
[docs] def send_msg( msg: str, is_player: bool = False, role: Optional[str] = None, uid: Optional[str] = None, flushing: bool = False, avatar: Optional[str] = None, msg_id: Optional[str] = None, ) -> None: """Sends a message to the web UI.""" global glb_uid_dict glb_queue_chat_msg = glb_uid_dict[uid]["glb_queue_chat_msg"] if is_player: glb_queue_chat_msg.put( [ { "text": msg, "name": role, "flushing": flushing, "avatar": avatar, }, None, ], ) else: glb_queue_chat_msg.put( [ None, { "text": msg, "name": role, "flushing": flushing, "avatar": avatar, "id": msg_id, }, ], )
[docs] def get_chat_msg(uid: Optional[str] = None) -> list: """Retrieves the next chat message from the queue, if available.""" global glb_uid_dict glb_queue_chat_msg = glb_uid_dict[uid]["glb_queue_chat_msg"] if not glb_queue_chat_msg.empty(): line = glb_queue_chat_msg.get(block=False) if line is not None: return line return []
[docs] def send_player_input(msg: str, uid: Optional[str] = None) -> None: """Sends player input to the web UI.""" global glb_uid_dict glb_queue_user_input = glb_uid_dict[uid]["glb_queue_user_input"] glb_queue_user_input.put([None, msg])
[docs] def get_player_input( timeout: Optional[int] = None, uid: Optional[str] = None, ) -> str: """Gets player input from the web UI or command line.""" global glb_uid_dict glb_queue_user_input = glb_uid_dict[uid]["glb_queue_user_input"] if timeout: try: content = glb_queue_user_input.get(block=True, timeout=timeout)[1] except Empty as exc: raise TimeoutError("timed out") from exc else: content = glb_queue_user_input.get(block=True)[1] if content == "**Reset**": glb_uid_dict[uid] = init_uid_queues() raise ResetException return content
[docs] def send_reset_msg(uid: Optional[str] = None) -> None: """Sends a reset message to the web UI.""" uid = check_uuid(uid) global glb_uid_dict glb_queue_reset_msg = glb_uid_dict[uid]["glb_queue_reset_msg"] glb_queue_reset_msg.put([None, "**Reset**"]) send_player_input("**Reset**", uid)
[docs] def get_reset_msg(uid: Optional[str] = None) -> None: """Retrieves a reset message from the queue, if available.""" global glb_uid_dict glb_queue_reset_msg = glb_uid_dict[uid]["glb_queue_reset_msg"] if not glb_queue_reset_msg.empty(): content = glb_queue_reset_msg.get(block=True)[1] if content == "**Reset**": glb_uid_dict[uid] = init_uid_queues() raise ResetException
[docs] class ResetException(Exception): """Custom exception to signal a reset action in the application."""
[docs] def check_uuid(uid: Optional[str]) -> str: """Checks whether a UUID is provided or generates a default one.""" if not uid or uid == "": if os.getenv("MODELSCOPE_ENVIRONMENT") == "studio": import gradio as gr raise gr.Error("Please login first") uid = "local_user" return uid
[docs] def generate_image_from_name(name: str) -> str: """Generates an image based on the hash of the given name.""" from agentscope.manager import FileManager file_manager = FileManager.get_instance() # Using hashlib to generate a hash of the name hash_func = hashlib.md5() hash_func.update(name.encode("utf-8")) hash_value = hash_func.hexdigest() # Extract the first 6 characters of the hash value as the hexadecimal # representation of the color # generate a color value between #000000 and #ffffff color_hex = "#" + hash_value[:6] color_rgb = Image.new("RGB", (1, 1), color_hex).getpixel((0, 0)) # If the image does not exist, generate and save it width, height = 200, 200 image = Image.new("RGB", (width, height), color_rgb) image_filepath = file_manager.save_image(image, f"{name}_image.png") return image_filepath
[docs] def audio2text(audio_path: str) -> str: """Converts audio file at the given path to text using ASR.""" try: from dashscope.audio.asr import RecognitionCallback, Recognition except ImportError as e: raise ImportError( "The package dashscope is not found. Please install it by " "running `pip install dashscope==1.14.1`", ) from e callback = RecognitionCallback() rec = Recognition( model="paraformer-realtime-v1", format="wav", sample_rate=16000, callback=callback, ) result = rec.call(audio_path) return " ".join([s["text"] for s in result["output"]["sentence"]])
[docs] def cycle_dots(text: str, num_dots: int = 3) -> str: """display thinking dots before agent reply""" current_dots = len(text) - len(text.rstrip(".")) next_dots = (current_dots + 1) % (num_dots + 1) if next_dots == 0: next_dots = 1 return text.rstrip(".") + "." * next_dots
[docs] def user_input( prefix: str = "User input: ", timeout: Optional[int] = None, ) -> str: """get user input""" if hasattr(thread_local_data, "uid"): get_reset_msg(uid=thread_local_data.uid) content = get_player_input( timeout=timeout, uid=thread_local_data.uid, ) else: if timeout: from inputimeout import inputimeout, TimeoutOccurred try: content = inputimeout(prefix, timeout=timeout) except TimeoutOccurred as exc: raise TimeoutError("timed out") from exc else: content = input(prefix) return content