# -*- coding: utf-8 -*-
"""The env module."""
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any, List, Callable
from concurrent.futures import ThreadPoolExecutor
import inspect
from loguru import logger
from ..exception import (
EnvNotFoundError,
EnvAlreadyExistError,
)
from .event import Event
from ..rpc.rpc_meta import RpcMeta, sync_func
[docs]
def trigger_listener(env: "Env", event: Event) -> None:
"""Trigger the listener bound to the event.
Args:
env (`Env`): The env that trigger the listener.
event (`Event`): The event information.
"""
futures = []
with ThreadPoolExecutor() as executor:
for listener in env.get_listeners(event.name):
futures.append(executor.submit(listener, env, event))
for future in futures:
future.result()
[docs]
def event_func(func: Callable) -> Callable:
"""A decorator to register an event function in `Env` and its
subclasses.
Note:
This decorator is only available in the subclasses of `Env`.
If a function is decorated with `@event_func`, at the end of
the function, all listeners bound to the function will be
triggered automatically.
Args:
func (`Callable`): The event function.
Returns:
`Callable`: The decorated event function.
"""
def wrapper( # type: ignore[no-untyped-def]
*args,
**kwargs,
) -> Any:
# get the dict format args of the decorated function
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
bound_args.apply_defaults()
args_dict = bound_args.arguments
# call the function
returns = func(*args, **kwargs)
self = args_dict.pop("self")
trigger_listener(
env=self,
event=Event(
name=func.__name__,
args=args_dict,
returns=returns,
),
)
return returns
return wrapper
[docs]
class EventListener(ABC):
"""A base class representing a listener for listening the event of an
environment. The actions of the listener should be implemented in the
`__call__` method.
Args:
env (`Env`): The environment instance who trigger the listener.
event (`Event`): The event information, which contains the event
function name (`name`: `str`), the arguments of the event function
(`args`: `dict`), and the return value of the event function
(`returns`: `Any`).
Note:
`EventListener` can only be bound to event functions (decorated
with `@event_func`).
"""
[docs]
def __init__(self, name: str) -> None:
"""Init a EventListener instance.
Args:
name (`str`): The name of the listener.
"""
self.name = name
@abstractmethod
def __call__(
self,
env: Env,
event: Event,
) -> None:
"""Activate the listener.
Args:
env (`Env`): The env bound to the listener.
event (`Event`): The event information.
"""
[docs]
class Env(ABC, metaclass=RpcMeta):
"""The Env Interface.
`Env` is a key concept of AgentScope, representing global
data shared among agents.
Each env has its own name and value, and multiple envs can
be organized into a tree structure, where each env can have
multiple children envs.
Different implementations of envs may have different event
functions, which are marked by `@event_func`.
Users can bind `EventListener` to specific event functions,
and the listener will be activated when the event function
is called.
"""
@property
@abstractmethod
def name(self) -> str:
"""Name of the env.
Returns:
`str`: The name of the env.
"""
[docs]
@abstractmethod
def get_children(self) -> dict[str, Env]:
"""Get the children envs of the current env.
Returns:
`dict[str, Env]`: The children envs.
"""
[docs]
@abstractmethod
def add_child(self, child: Env) -> bool:
"""Add a child env to the current env.
Args:
child (`Env`): The children envs.
Returns:
`bool`: Whether the children were added successfully.
"""
[docs]
@abstractmethod
def remove_child(self, children_name: str) -> bool:
"""Remove a child env from the current env.
Args:
children_name (`str`): The name of the children env.
Returns:
`bool`: Whether the children were removed successfully.
"""
[docs]
@abstractmethod
def add_listener(self, target_event: str, listener: EventListener) -> bool:
"""Add a listener to the env.
Args:
target_event (`str`): The event function to listen.
listener (`EventListener`): The listener to add.
Returns:
`bool`: Whether the listener was added successfully.
"""
[docs]
@abstractmethod
def remove_listener(self, target_event: str, listener_name: str) -> bool:
"""Remove a listener from the env.
Args:
target_event (`str`): The event function.
listener_name (`str`): The name of the listener to remove.
Returns:
`bool`: Whether the listener was removed successfully.
"""
[docs]
@abstractmethod
def get_listeners(self, target_event: str) -> List[EventListener]:
"""Get the listeners of the specific event.
Args:
target_event (`str`): The event name.
Returns:
`List[EventListener]`: The listeners of the specific event.
"""
@sync_func
@abstractmethod
def __getitem__(self, env_name: str) -> Env:
"""Get a child env."""
@abstractmethod
def __setitem__(self, env_name: str, env: Env) -> None:
"""Set a child env."""
[docs]
@abstractmethod
def describe(self, **kwargs: Any) -> str:
"""Describe the current state of the environment."""
[docs]
class BasicEnv(Env):
"""A basic implementation of Env, which has no event function
and cannot get value.
Note:
`BasicEnv` is used as the base class to implement other
envs. Application developers should not use this class.
"""
[docs]
def __init__(
self,
name: str,
listeners: dict[str, List[EventListener]] = None,
children: List[Env] = None,
) -> None:
"""Init an BasicEnv instance.
Args:
name (`str`): The name of the env.
listeners (`dict[str, List[EventListener]]`, optional): The
listener dict. Defaults to None.
children (`List[Env]`, optional): A list of children
envs. Defaults to None.
"""
self._name = name
self.children = {
child.name: child for child in (children if children else [])
}
self.event_listeners = {}
if listeners:
for target_func, listener in listeners.items():
if isinstance(listener, EventListener):
self.add_listener(target_func, listener)
else:
for ls in listener:
self.add_listener(target_func, ls)
@property
def name(self) -> str:
"""Name of the env"""
return self._name
[docs]
def get_children(self) -> dict[str, Env]:
"""Get the children envs of the current env.
Returns:
`dict[str, Env]`: The children envs.
"""
return self.children
[docs]
def add_child(self, child: Env) -> bool:
"""Add a child env to the current env.
Args:
child (`Env`): The children
envs.
Returns:
`bool`: Whether the children were added successfully.
"""
if child.name in self.children:
return False
self.children[child.name] = child
return True
[docs]
def remove_child(self, children_name: str) -> bool:
"""Remove a child env from the current env.
Args:
children_name (`str`): The name of the children env.
Returns:
`bool`: Whether the children were removed successfully.
"""
if children_name in self.children:
del self.children[children_name]
return True
return False
[docs]
def add_listener(self, target_event: str, listener: EventListener) -> bool:
"""Add a listener to the env.
Args:
target_event (`str`): The name of the event to listen.
listener (`EventListener`): The listener to add.
Returns:
`bool`: Whether the listener was added successfully.
"""
if hasattr(self, target_event):
if target_event not in self.event_listeners:
self.event_listeners[target_event] = {}
if listener.name not in self.event_listeners[target_event]:
self.event_listeners[target_event][listener.name] = listener
return True
else:
logger.warning(
f"Listener {listener.name} already "
f"exists in {target_event}",
)
else:
logger.warning(f"Event {target_event} does not exist")
return False
[docs]
def remove_listener(self, target_event: str, listener_name: str) -> bool:
"""Remove a listener from the env.
Args:
target_event (`str`): The event name.
listener_name (`str`): The name of the listener to remove.
Returns:
`bool`: Whether the listener was removed successfully.
"""
if target_event in self.event_listeners:
if listener_name in self.event_listeners[target_event]:
del self.event_listeners[target_event][listener_name]
return True
else:
logger.warning(
f"Listener {listener_name} does not"
f" exist in {target_event}",
)
else:
logger.warning(f"Event {target_event} does not exist")
return False
[docs]
def get_listeners(self, target_event: str) -> List[EventListener]:
"""Get the listeners of the specific event.
Args:
target_event (`str`): The event name.
Returns:
`List[EventListener]`: The listeners of the specific event.
"""
if target_event in self.event_listeners:
return list(self.event_listeners[target_event].values())
else:
return []
[docs]
def describe(self, **kwargs: Any) -> str:
"""Describe the current state of the environment."""
raise NotImplementedError(
"`describe` is not implemented in `BasicEnv`.",
)
def __getitem__(self, env_name: str) -> Env:
if env_name in self.children:
return self.children[env_name]
else:
raise EnvNotFoundError(env_name)
def __setitem__(self, env_name: str, env: Env) -> None:
if not isinstance(env, Env):
raise TypeError("Only Env can be set")
if env_name not in self.children:
self.children[env_name] = env
logger.debug(f"Set Env[{env_name}] as child of Env[{self.name}]")
else:
raise EnvAlreadyExistError(env_name)