Source code for agentscope.web.gradio.studio

# -*- coding: utf-8 -*-
"""run web ui"""
import argparse
import os
import sys
import threading
import time
from collections import defaultdict
from typing import Optional, Callable
import traceback

try:
    import gradio as gr
except ImportError:
    gr = None

try:
    import modelscope_studio as mgr
except ImportError:
    mgr = None

from agentscope.web.gradio.utils import (
    send_player_input,
    get_chat_msg,
    SYS_MSG_PREFIX,
    ResetException,
    check_uuid,
    send_msg,
    generate_image_from_name,
    audio2text,
    send_reset_msg,
    thread_local_data,
    cycle_dots,
)
from agentscope.web.gradio.constants import _SPEAK

MAX_NUM_DISPLAY_MSG = 20
FAIL_COUNT_DOWN = 30


[docs] def init_uid_list() -> list: """Initialize an empty list for storing user IDs.""" return []
glb_history_dict = defaultdict(init_uid_list) glb_doing_signal_dict = defaultdict(init_uid_list) glb_signed_user = []
[docs] def reset_glb_var(uid: str) -> None: """Reset global variables for a given user ID.""" global glb_history_dict global glb_doing_signal_dict glb_history_dict[uid] = init_uid_list() glb_doing_signal_dict[uid] = init_uid_list()
[docs] def get_chat(uid: str) -> list[list]: """Retrieve chat messages for a given user ID.""" uid = check_uuid(uid) global glb_history_dict global glb_doing_signal_dict line = get_chat_msg(uid=uid) # TODO: Optimize the display effect, currently there is a problem of # output display jumping if line: if line[1] and line[1]["text"] == _SPEAK: line[1]["text"] = "" glb_doing_signal_dict[uid] = line else: glb_history_dict[uid] += [line] glb_doing_signal_dict[uid] = [] dial_msg = [] for line in glb_history_dict[uid]: _, msg = line if isinstance(msg, dict): dial_msg.append(line) else: # User chat, format: (msg, None) dial_msg.append(line) if glb_doing_signal_dict[uid]: if glb_doing_signal_dict[uid][1]: text = cycle_dots(glb_doing_signal_dict[uid][1]["text"]) glb_doing_signal_dict[uid][1]["text"] = text glb_doing_signal_dict[uid][1]["id"] = str(time.time()) glb_doing_signal_dict[uid][1]["flushing"] = False dial_msg.append(glb_doing_signal_dict[uid]) return dial_msg[-MAX_NUM_DISPLAY_MSG:]
[docs] def send_audio(audio_term: str, uid: str) -> None: """Convert audio input to text and send as a chat message.""" uid = check_uuid(uid) content = audio2text(audio_path=audio_term) send_player_input(content, uid=uid) msg = f"""{content} <audio src="{audio_term}"></audio>""" send_msg(msg, is_player=True, role="Me", uid=uid, avatar=None)
[docs] def send_image(image_term: str, uid: str) -> None: """Send an image as a chat message.""" uid = check_uuid(uid) send_player_input(image_term, uid=uid) msg = f"""<img src="{image_term}"></img>""" avatar = generate_image_from_name("Me") send_msg(msg, is_player=True, role="Me", uid=uid, avatar=avatar)
[docs] def send_message(msg: str, uid: str) -> str: """Send a generic message to the player.""" uid = check_uuid(uid) send_player_input(msg, uid=uid) avatar = generate_image_from_name("Me") send_msg(msg, is_player=True, role="Me", uid=uid, avatar=avatar) return ""
[docs] def fn_choice(data: gr.EventData, uid: str) -> None: """Handle a selection event from the chatbot interface.""" uid = check_uuid(uid) # pylint: disable=protected-access send_player_input(data._data["value"], uid=uid)
[docs] def import_function_from_path( module_path: str, function_name: str, module_name: Optional[str] = None, ) -> Callable: """Import a function from the given module path.""" import importlib.util script_dir = os.path.dirname(os.path.abspath(module_path)) # Temporarily add a script directory to sys.path original_sys_path = sys.path[:] sys.path.insert(0, script_dir) try: # If a module name is not provided, you can use the filename ( # without extension) as the module name if module_name is None: module_name = os.path.splitext(os.path.basename(module_path))[0] # Creating module specifications and loading modules spec = importlib.util.spec_from_file_location( module_name, module_path, ) if spec is not None: module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # Getting a function from a module function = getattr(module, function_name) else: raise ImportError( f"Could not find module spec for {module_name} at" f" {module_path}", ) except AttributeError as exc: raise AttributeError( f"The module '{module_name}' does not have a function named '" f"{function_name}'. Please put your code in the main function, " f"read README.md for details.", ) from exc finally: # Restore the original sys.path sys.path = original_sys_path return function
# pylint: disable=too-many-statements
[docs] def run_app() -> None: """Entry point for the web UI application.""" assert gr is not None, "Please install [full] version of AgentScope." parser = argparse.ArgumentParser() parser.add_argument("script", type=str, help="Script file to run") args = parser.parse_args() # Make sure script_path is an absolute path script_path = os.path.abspath(args.script) # Get the directory where the script is located script_dir = os.path.dirname(script_path) # Save the current working directory # Change the current working directory to the directory where os.chdir(script_dir) def start_game(uid: str) -> None: """Start the main game loop.""" thread_local_data.uid = uid if script_path.endswith(".py"): main = import_function_from_path(script_path, "main") elif script_path.endswith(".json"): from agentscope.web.workstation.workflow import ( start_workflow, load_config, ) config = load_config(script_path) main = lambda: start_workflow(config) else: raise ValueError(f"Unrecognized file formats: {script_path}") while True: try: main() except ResetException: print(f"Reset Successfully:{uid} ") except Exception as e: trace_info = "".join( traceback.TracebackException.from_exception(e).format(), ) for i in range(FAIL_COUNT_DOWN, 0, -1): send_msg( f"{SYS_MSG_PREFIX} error {trace_info}, reboot " f"in {i} seconds", uid=uid, ) time.sleep(1) reset_glb_var(uid) def check_for_new_session(uid: str) -> None: """ Check for a new user session and start a game thread if necessary. """ uid = check_uuid(uid) if uid not in glb_signed_user: glb_signed_user.append(uid) print("==========Signed User==========") print(f"Total number of users: {len(glb_signed_user)}") run_thread = threading.Thread( target=start_game, args=(uid,), ) run_thread.start() with gr.Blocks() as demo: warning_html_code = """ <div class="hint" style="text-align: center;background-color: rgba(255, 255, 0, 0.15); padding: 10px; margin: 10px; border-radius: 5px; border: 1px solid #ffcc00;"> <p>If you want to start over, please click the <strong>reset</strong> button and <strong>refresh</strong> the page</p> </div> """ gr.HTML(warning_html_code) uuid = gr.Textbox(label="modelscope_uuid", visible=False) with gr.Row(): chatbot = mgr.Chatbot( label="Dialog", show_label=False, bubble_full_width=False, visible=True, ) with gr.Column(): user_chat_input = gr.Textbox( label="user_chat_input", placeholder="Say something here", show_label=False, ) send_button = gr.Button(value="📣Send") with gr.Row(): audio = gr.Accordion("Audio input", open=False) with audio: audio_term = gr.Audio( visible=True, type="filepath", format="wav", ) submit_audio_button = gr.Button(value="Send Audio") image = gr.Accordion("Image input", open=False) with image: image_term = gr.Image( visible=True, height=300, interactive=True, type="filepath", ) submit_image_button = gr.Button(value="Send Image") with gr.Column(): reset_button = gr.Button(value="Reset") # submit message send_button.click( send_message, [user_chat_input, uuid], user_chat_input, ) user_chat_input.submit( send_message, [user_chat_input, uuid], user_chat_input, ) submit_audio_button.click( send_audio, inputs=[audio_term, uuid], outputs=[audio_term], ) submit_image_button.click( send_image, inputs=[image_term, uuid], outputs=[image_term], ) reset_button.click(send_reset_msg, inputs=[uuid]) chatbot.custom(fn=fn_choice, inputs=[uuid]) demo.load( check_for_new_session, inputs=[uuid], every=0.5, ) demo.load( get_chat, inputs=[uuid], outputs=[chatbot], every=0.5, ) demo.queue() demo.launch()
if __name__ == "__main__": run_app()