agentscope.service.execute_code.exec_python 源代码

# -*- coding: utf-8 -*-
"""Service to execute python code."""
import builtins
import contextlib
import inspect
import io
import multiprocessing
import os
import platform
import re
import shutil
import subprocess
import sys
import traceback
from hashlib import md5
from typing import Optional, Union, Tuple

from loguru import logger

try:
    import docker
    from docker.errors import APIError, ImageNotFound
except ImportError:
    docker = None
try:
    import resource
except (ModuleNotFoundError, ImportError):
    resource = None

from ...utils.common import create_tempdir, timer
from ..service_status import ServiceExecStatus
from ..service_response import ServiceResponse
from ...constants import (
    _DEFAULT_PYPI_MIRROR,
    _DEFAULT_TRUSTED_HOST,
)


[文档] def execute_python_code( code: str, timeout: Optional[Union[int, float]] = 300, use_docker: Optional[Union[bool, str]] = None, maximum_memory_bytes: Optional[int] = None, ) -> ServiceResponse: """ Execute a piece of python code. This function can run Python code provided in string format. It has the option to execute the code within a Docker container to provide an additional layer of security, especially important when running untrusted code. WARNING: If `use_docker` is set to `False`, the `code` will be run directly in the host system's environment. This poses a potential security risk if the code is untrusted. Only disable Docker if you are confident in the safety of the code being executed. Args: code (`str`, optional): The Python code to be executed. timeout (`Optional[Union[int, float]]`, defaults to `300`): The maximum time (in seconds) allowed for the code to run. If the code execution time exceeds this limit, it will be terminated. Set to `None` for no time limit. Default is 300. use_docker (`Optional[Union[bool, str]]`, defaults to `None`): Determines whether to execute the code within a Docker container. If `False`, the system's native Python environment is used. When set to `None`, the function checks for Docker's availability and uses it if present. When set to some string, will use the docker with string as the image name. Default is `None`. maximum_memory_bytes (`Optional[int]`, defaults to `None`): The memory limit in bytes for the code execution. If not specified, there is no memory limit imposed. Returns: `ServiceResponse`: A ServiceResponse containing two elements: `output` and `error`. Both `output` and `error` are strings that capture the standard output and standard error of the code execution, respectively. Note: IPython-specific operations such as `plt.show()` for displaying matplotlib plots are currently not supported. This limitation stems from the non-interactive nature of the execution environment. The argument `timeout` is not available in Windows OS, since the since `signal.setitimer` is only available in Unix. """ # Check if the `use_docker` flag has been explicitly set by the user. if use_docker is None: # If `use_docker` is not set, determine whether to use Docker based on # the availability of the Docker module in the environment. if docker is None: # If the Docker module is not available, default to not using # Docker. use_docker = False else: # If the Docker module is available, default to using Docker. use_docker = True if use_docker: response = _execute_python_code_docker( code, timeout, use_docker, maximum_memory_bytes, ) else: response = _execute_python_code_sys( code, timeout, maximum_memory_bytes, ) return response
def _sys_execute( code: str, shared_list: list, maximum_memory_bytes: int, timeout: int, ) -> None: """ Executes the given Python code in a controlled environment, capturing the output and errors. Parameters: code (str): The Python code to be executed. shared_list (ListProxy): A list proxy managed by a multiprocessing.Manager to which the output and error messages will be appended, along with a success flag. maximum_memory_bytes (int): The maximum amount of memory in bytes that the execution is allowed to use. timeout (int): The maximum amount of time in seconds that the code is allowed to run. Returns: None: This function does not return anything. It appends the results to the shared_list. """ is_success = False with create_tempdir(): # These system calls are needed when cleaning up tempdir. rmtree = shutil.rmtree rmdir = os.rmdir chdir = os.chdir sys_python_guard(maximum_memory_bytes) output_buffer, error_buffer = io.StringIO(), io.StringIO() with timer(timeout), contextlib.redirect_stdout( output_buffer, ), contextlib.redirect_stderr(error_buffer): try: exec(code) is_success = True except Exception: error_buffer.write(traceback.format_exc()) # Needed for cleaning up. shutil.rmtree = rmtree os.rmdir = rmdir os.chdir = chdir shared_list.extend( [output_buffer.getvalue(), error_buffer.getvalue(), is_success], ) def _execute_python_code_sys( code: str = "", timeout: Optional[Union[int, float]] = None, maximum_memory_bytes: Optional[int] = None, ) -> ServiceResponse: """ Execute string of python code in system environments. WARNING: This function is designed to execute code generated by models that have not been explicitly trusted. The likelihood of such code being maliciously harmful is low, yet there exists a risk of unintended destructive behavior arising from the model's limitations or misalignment. """ logger.warning( "Executing code in system environments. There exists a risk of " "unintended destructive behavior. Please consider using a " "containerized environment.", ) manager = multiprocessing.Manager() shared_list = manager.list() p = multiprocessing.Process( target=_sys_execute, args=( code, shared_list, maximum_memory_bytes, timeout, ), ) p.start() p.join() if p.is_alive(): p.kill() output, error, status = shared_list[0], shared_list[1], shared_list[2] if status: return ServiceResponse( status=ServiceExecStatus.SUCCESS, content=output, ) else: return ServiceResponse( status=ServiceExecStatus.ERROR, content=f"{output}\n{error}", ) def _execute_python_code_docker( code: str = "", timeout: Optional[Union[int, float]] = None, use_docker: Optional[Union[bool, str]] = True, maximum_memory_bytes: Optional[int] = None, ) -> ServiceResponse: """ Execute string of python code in containerized environments. If ImportErrors occur, this function will attempt to install the missing packages and retry execution until no ImportErrors are found or until execution succeeds. """ def docker_execute( exec_code: str, max_retries: int = 5, ) -> Tuple: """Helper function to execute code inside the container.""" missing_modules = [] # Extract source code with wrapper timer timer_code = str(inspect.getsource(timer)) is_success = False # Construct the timer context manager code exec_code_with_timer = ( "import contextlib, signal\nfrom typing import Any, Generator, " "Optional, Union\n" + timer_code + f"\nwith timer({timeout}):\n " ) # Construct the command to be executed inside the timer context exec_code_with_timer = f"""{exec_code_with_timer} exec('''{exec_code}''') """ # Create a temporary file to store the commands to run code_hash = md5(code.encode()).hexdigest() file_name = f"tmp_code_{code_hash}.py" with open(file_name, "w", encoding="utf-8") as exec_code_file: exec_code_file.write(exec_code_with_timer) try: for _ in range(max_retries): # Check if there are missing modules to install install_command = ( f"pip install -q {' '.join(missing_modules)} -i" f" {_DEFAULT_PYPI_MIRROR} " f"--trusted-host {_DEFAULT_TRUSTED_HOST}" if missing_modules else "" ) # Construct the Docker command docker_command = ( f"{install_command} && python /app/{file_name}" ) docker_command = docker_command.strip("& ") container = client.containers.run( image=image_name, command=docker_command, volumes={os.getcwd(): {"bind": "/app", "mode": "rw"}}, working_dir="/app", detach=True, ) wait_response = container.wait() docker_out = container.logs(stdout=True, stderr=False).decode( "utf-8", ) docker_err = container.logs(stdout=False, stderr=True).decode( "utf-8", ) is_success = wait_response.get("StatusCode", None) == 0 # Check for ImportError or ModuleNotFoundError in stderr if ( "ImportError" not in docker_err and "ModuleNotFoundError" not in docker_err ): break # Extract the name of the missing module missing_module_match = re.search( r"No module named '(\w+)'", docker_err, ) if missing_module_match: missing_modules.append(missing_module_match.group(1)) else: # If a missing module cannot be determined, do not retry break except Exception as e: logger.error(e) finally: # Clean up the temporary file if os.path.exists(file_name): os.remove(file_name) return docker_out, docker_err, is_success client = docker.from_env() # Initialize Docker client # Step 1. Pull images & enter images image_name = "python:3.9.12" if use_docker is True else use_docker # Check if the image exists locally before pulling local_images = [ tag for image in client.images.list() for tag in image.tags ] if image_name not in local_images: try: # Pull the image if it does not exist locally client.images.pull(image_name) except (ImageNotFound, APIError) as e: return ServiceResponse( status=ServiceExecStatus.ERROR, content=f"Failed to pull Docker image: {e}", ) # Step 2. Execute code and catch Import Error and re-install run_args = {"image": image_name, "detach": True, "network_disabled": False} if maximum_memory_bytes is not None: run_args["mem_limit"] = maximum_memory_bytes # Try to execute the code and retry if ImportErrors are encountered output, error, status = docker_execute(code) if status: return ServiceResponse( status=ServiceExecStatus.SUCCESS, content=output, ) else: return ServiceResponse( status=ServiceExecStatus.ERROR, content=f"{output}\n{error}", )
[文档] def sys_python_guard(maximum_memory_bytes: Optional[int] = None) -> None: """ This disables various destructive functions and prevents the generated code from interfering with the test (e.g. fork bomb, killing other processes, removing filesystem files, etc.) The implementation of this function are modified from https://github.com/openai/human-eval/blob/master/human_eval/execution.py """ if resource is not None: if maximum_memory_bytes is not None: resource.setrlimit( resource.RLIMIT_AS, (maximum_memory_bytes, maximum_memory_bytes), ) resource.setrlimit( resource.RLIMIT_DATA, (maximum_memory_bytes, maximum_memory_bytes), ) if not platform.uname().system == "Darwin": resource.setrlimit( resource.RLIMIT_STACK, (maximum_memory_bytes, maximum_memory_bytes), ) # Disable builtins functions builtins_funcs_to_disable = ["exit", "quit"] for func_name in builtins_funcs_to_disable: setattr(builtins, func_name, None) # Disable os functions os.environ["OMP_NUM_THREADS"] = "1" os_funcs_to_disable = [ "kill", "system", "putenv", "remove", "removedirs", "rmdir", "fchdir", "setuid", "fork", "forkpty", "killpg", "rename", "renames", "truncate", "replace", "unlink", "fchmod", "fchown", "chmod", "chown", "chroot", "lchflags", "lchmod", "lchown", "getcwd", "chdir", ] for func_name in os_funcs_to_disable: setattr(os, func_name, None) # Disable shutil functions shutil_funcs_to_disable = ["rmtree", "move", "chown"] for func_name in shutil_funcs_to_disable: setattr(shutil, func_name, None) # Disable subprocess functions subprocess_funcs_to_disable = ["Popen"] for func_name in subprocess_funcs_to_disable: setattr(subprocess, func_name, None) __builtins__["help"] = None # Disable sys modules sys_modules_to_disable = [ "ipdb", "joblib", "resource", "psutil", "tkinter", ] for module_name in sys_modules_to_disable: sys.modules[module_name] = None