Source code for agentscope.utils.common

# -*- coding: utf-8 -*-
""" Common utils."""
import base64
import contextlib
import datetime
import hashlib
import json
import os
import random
import re
import secrets
import signal
import socket
import string
import sys
import tempfile
import threading
from typing import Any, Generator, Optional, Union, Tuple, Literal, List
from urllib.parse import urlparse

import psutil
import requests


[docs] @contextlib.contextmanager def timer(seconds: Optional[Union[int, float]] = None) -> Generator: """ A context manager that limits the execution time of a code block to a given number of seconds. The implementation of this contextmanager are borrowed from https://github.com/openai/human-eval/blob/master/human_eval/execution.py Note: This function only works in Unix and MainThread, since `signal.setitimer` is only available in Unix. """ if ( seconds is None or sys.platform == "win32" or threading.currentThread().name # pylint: disable=W4902 != "MainThread" ): yield return def signal_handler(*args: Any, **kwargs: Any) -> None: raise TimeoutError("timed out") signal.setitimer(signal.ITIMER_REAL, seconds) signal.signal(signal.SIGALRM, signal_handler) try: # Enter the context and execute the code block. yield finally: signal.setitimer(signal.ITIMER_REAL, 0)
[docs] @contextlib.contextmanager def create_tempdir() -> Generator: """ A context manager that creates a temporary directory and changes the current working directory to it. The implementation of this contextmanager are borrowed from https://github.com/openai/human-eval/blob/master/human_eval/execution.py """ with tempfile.TemporaryDirectory() as dirname: with _chdir(dirname): yield dirname
@contextlib.contextmanager def _chdir(path: str) -> Generator: """ A context manager that changes the current working directory to the given path. The implementation of this contextmanager are borrowed from https://github.com/openai/human-eval/blob/master/human_eval/execution.py """ if path == ".": yield return cwd = os.getcwd() os.chdir(path) try: yield except BaseException as exc: raise exc finally: os.chdir(cwd) def _requests_get( url: str, params: dict, headers: Optional[dict] = None, ) -> Union[dict, str]: """ Sends a GET request to the specified URL with the provided query parameters and headers. Returns the JSON response as a dictionary. This function handles the request, checks for errors, logs exceptions, and parses the JSON response. Args: url (str): The URL to which the GET request is sent. params (Dict): A dictionary containing query parameters to be included in the request. headers (Optional[Dict]): An optional dictionary of HTTP headers to send with the request. Returns: Dict or str: If the request is successful, returns a dictionary containing the parsed JSON data. If the request fails, returns the error string. """ # Make the request try: # Check if headers are provided, and include them if they are not None if headers: response = requests.get(url, params=params, headers=headers) else: response = requests.get(url, params=params) # This will raise an exception for HTTP error codes response.raise_for_status() except requests.RequestException as e: return str(e) # Parse the JSON response search_results = response.json() return search_results def _if_change_database(sql_query: str) -> bool: """Check whether SQL query only contains SELECT query""" # Compile the regex pattern outside the function for better performance pattern_unsafe_sql = re.compile( r"\b(INSERT|UPDATE|DELETE|REPLACE|CREATE|ALTER|DROP|TRUNCATE|USE)\b", re.IGNORECASE, ) # Remove SQL comments sql_query = re.sub(r"--.*?$", "", sql_query, flags=re.MULTILINE) # Remove /* */ comments sql_query = re.sub(r"/\*.*?\*/", "", sql_query, flags=re.DOTALL) # Matching non-SELECT statements with regular expressions if pattern_unsafe_sql.search(sql_query): return False return True def _get_timestamp( format_: str = "%Y-%m-%d %H:%M:%S", time: datetime.datetime = None, ) -> str: """Get current timestamp.""" if time is None: return datetime.datetime.now().strftime(format_) else: return time.strftime(format_)
[docs] def to_openai_dict(item: dict) -> dict: """Convert `Msg` to `dict` for OpenAI API.""" clean_dict = {} if "name" in item: clean_dict["name"] = item["name"] if "role" in item: clean_dict["role"] = item["role"] else: clean_dict["role"] = "assistant" if "content" in item: clean_dict["content"] = _convert_to_str(item["content"]) else: raise ValueError("The content of the message is missing.") return clean_dict
def _find_available_port() -> int: """Get an unoccupied socket port number.""" with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("", 0)) return s.getsockname()[1] def _check_port(port: Optional[int] = None) -> int: """Check if the port is available. Args: port (`int`): the port number being checked. Returns: `int`: the port number that passed the check. If the port is found to be occupied, an available port number will be automatically returned. """ if port is None: new_port = _find_available_port() return new_port with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: if s.connect_ex(("localhost", port)) == 0: raise RuntimeError("Port is occupied.") except Exception: new_port = _find_available_port() return new_port return port def _guess_type_by_extension( url: str, ) -> Literal["image", "audio", "video", "file"]: """Guess the type of the file by its extension.""" extension = url.split(".")[-1].lower() if extension in [ "bmp", "dib", "icns", "ico", "jfif", "jpe", "jpeg", "jpg", "j2c", "j2k", "jp2", "jpc", "jpf", "jpx", "apng", "png", "bw", "rgb", "rgba", "sgi", "tif", "tiff", "webp", ]: return "image" elif extension in [ "amr", "wav", "3gp", "3gpp", "aac", "mp3", "flac", "ogg", ]: return "audio" elif extension in [ "mp4", "webm", "mkv", "flv", "avi", "mov", "wmv", "rmvb", ]: return "video" else: return "file" def _to_openai_image_url(url: str) -> str: """Convert an image url to openai format. If the given url is a local file, it will be converted to base64 format. Otherwise, it will be returned directly. Args: url (`str`): The local or public url of the image. """ # See https://platform.openai.com/docs/guides/vision for details of # support image extensions. support_image_extensions = ( ".png", ".jpg", ".jpeg", ".gif", ".webp", ) parsed_url = urlparse(url) lower_url = url.lower() # Web url if parsed_url.scheme != "": if any(lower_url.endswith(_) for _ in support_image_extensions): return url # Check if it is a local file elif os.path.exists(url) and os.path.isfile(url): if any(lower_url.endswith(_) for _ in support_image_extensions): with open(url, "rb") as image_file: base64_image = base64.b64encode(image_file.read()).decode( "utf-8", ) extension = parsed_url.path.lower().split(".")[-1] mime_type = f"image/{extension}" return f"data:{mime_type};base64,{base64_image}" raise TypeError(f"{url} should be end with {support_image_extensions}.") def _download_file(url: str, path_file: str, max_retries: int = 3) -> bool: """Download file from the given url and save it to the given path. Args: url (`str`): The url of the file. path_file (`str`): The path to save the file. max_retries (`int`, defaults to `3`) The maximum number of retries when fail to download the file. """ for n_retry in range(1, max_retries + 1): response = requests.get(url, stream=True) if response.status_code == requests.codes.ok: with open(path_file, "wb") as file: for chunk in response.iter_content(1024): file.write(chunk) return True else: raise RuntimeError( f"Failed to download file from {url} (status code: " f"{response.status_code}). Retry {n_retry}/{max_retries}.", ) return False def _generate_random_code( length: int = 6, uppercase: bool = True, lowercase: bool = True, digits: bool = True, ) -> str: """Get random code.""" characters = "" if uppercase: characters += string.ascii_uppercase if lowercase: characters += string.ascii_lowercase if digits: characters += string.digits return "".join(secrets.choice(characters) for i in range(length)) def _generate_id_from_seed(seed: str, length: int = 8) -> str: """Generate random id from seed str. Args: seed (`str`): seed string. length (`int`): generated id length. """ hasher = hashlib.sha256() hasher.update(seed.encode("utf-8")) hash_digest = hasher.hexdigest() random.seed(hash_digest) id_chars = [ random.choice(string.ascii_letters + string.digits) for _ in range(length) ] return "".join(id_chars) def _is_web_url(url: str) -> bool: """Whether the url is accessible from the Web. Args: url (`str`): The url to check. Note: This function is not perfect, it only checks if the URL starts with common web protocols, e.g., http, https, ftp, oss. """ parsed_url = urlparse(url) return parsed_url.scheme in ["http", "https", "ftp", "oss"] def _is_json_serializable(obj: Any) -> bool: """Check if the given object is json serializable.""" try: json.dumps(obj) return True except TypeError: return False def _convert_to_str(content: Any) -> str: """Convert the content to string. Note: For prompt engineering, simply calling `str(content)` or `json.dumps(content)` is not enough. - For `str(content)`, if `content` is a dictionary, it will turn double quotes to single quotes. When this string is fed into prompt, the LLMs may learn to use single quotes instead of double quotes (which cannot be loaded by `json.loads` API). - For `json.dumps(content)`, if `content` is a string, it will add double quotes to the string. LLMs may learn to use double quotes to wrap strings, which leads to the same issue as `str(content)`. To avoid these issues, we use this function to safely convert the content to a string used in prompt. Args: content (`Any`): The content to be converted. Returns: `str`: The converted string. """ if isinstance(content, str): return content elif isinstance(content, (dict, list, int, float, bool, tuple)): return json.dumps(content, ensure_ascii=False) else: return str(content) def _join_str_with_comma_and(elements: List[str]) -> str: """Return the JSON string with comma, and use " and " between the last two elements.""" if len(elements) == 0: return "" elif len(elements) == 1: return elements[0] elif len(elements) == 2: return " and ".join(elements) else: return ", ".join(elements[:-1]) + f", and {elements[-1]}"
[docs] class ImportErrorReporter: """Used as a placeholder for missing packages. When called, an ImportError will be raised, prompting the user to install the specified extras requirement. """
[docs] def __init__(self, error: ImportError, extras_require: str = None) -> None: """Init the ImportErrorReporter. Args: error (`ImportError`): the original ImportError. extras_require (`str`): the extras requirement. """ self.error = error self.extras_require = extras_require
def __call__(self, *args: Any, **kwds: Any) -> Any: return self._raise_import_error() def __getattr__(self, name: str) -> Any: return self._raise_import_error() def __getitem__(self, __key: Any) -> Any: return self._raise_import_error() def _raise_import_error(self) -> Any: """Raise the ImportError""" err_msg = f"ImportError occorred: [{self.error.msg}]." if self.extras_require is not None: err_msg += ( f" Please install [{self.extras_require}] version" " of agentscope." ) raise ImportError(err_msg)
def _hash_string( data: str, hash_method: Literal["sha256", "md5", "sha1"], ) -> str: """Hash the string data.""" hash_func = getattr(hashlib, hash_method)() hash_func.update(data.encode()) return hash_func.hexdigest() def _get_process_creation_time() -> datetime.datetime: """Get the creation time of the process.""" pid = os.getpid() # Find the process by pid current_process = psutil.Process(pid) # Obtain the process creation time create_time = current_process.create_time() # Change the timestamp to a readable format return datetime.datetime.fromtimestamp(create_time) def _is_process_alive( pid: int, create_time_str: str, create_time_format: str = "%Y-%m-%d %H:%M:%S", tolerance_seconds: int = 10, ) -> bool: """Check if the process is alive by comparing the actual creation time of the process with the given creation time. Args: pid (`int`): The process id. create_time_str (`str`): The given creation time string. create_time_format (`str`, defaults to `"%Y-%m-%d %H:%M:%S"`): The format of the given creation time string. tolerance_seconds (`int`, defaults to `10`): The tolerance seconds for comparing the actual creation time with the given creation time. Returns: `bool`: True if the process is alive, False otherwise. """ try: # Try to create a process object by pid proc = psutil.Process(pid) # Obtain the actual creation time of the process actual_create_time_timestamp = proc.create_time() # Convert the given creation time string to a datetime object given_create_time_datetime = datetime.datetime.strptime( create_time_str, create_time_format, ) # Calculate the time difference between the actual creation time and time_difference = abs( actual_create_time_timestamp - given_create_time_datetime.timestamp(), ) # Compare the actual creation time with the given creation time if time_difference <= tolerance_seconds: return True except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess): # If the process is not found, access is denied, or the process is a # zombie process, return False return False return False def _is_windows() -> bool: """Check if the system is Windows.""" return os.name == "nt" def _map_string_to_color_mark( target_str: str, ) -> Tuple[str, str]: """Map a string into an index within a given length. Args: target_str (`str`): The string to be mapped. Returns: `Tuple[str, str]`: A color marker tuple """ color_marks = [ ("\033[90m", "\033[0m"), ("\033[91m", "\033[0m"), ("\033[92m", "\033[0m"), ("\033[93m", "\033[0m"), ("\033[94m", "\033[0m"), ("\033[95m", "\033[0m"), ("\033[96m", "\033[0m"), ("\033[97m", "\033[0m"), ] hash_value = int(hashlib.sha256(target_str.encode()).hexdigest(), 16) index = hash_value % len(color_marks) return color_marks[index] def _generate_new_runtime_id() -> str: """Generate a new random runtime id.""" _RUNTIME_ID_FORMAT = "run_%Y%m%d-%H%M%S_{}" return _get_timestamp(_RUNTIME_ID_FORMAT).format( _generate_random_code(uppercase=False), )