# -*- coding: utf-8 -*-
"""Workflow node opt."""
from abc import ABC, abstractmethod
from enum import IntEnum
from functools import partial
from typing import List, Optional
from agentscope import msghub
from agentscope.agents import (
DialogAgent,
UserAgent,
DictDialogAgent,
ReActAgent,
)
from agentscope.manager import ModelManager
from agentscope.message import Msg
from agentscope.pipelines import (
SequentialPipeline,
ForLoopPipeline,
WhileLoopPipeline,
IfElsePipeline,
SwitchPipeline,
)
from agentscope.pipelines.functional import placeholder
from agentscope.web.workstation.workflow_utils import (
kwarg_converter,
deps_converter,
dict_converter,
)
from agentscope.service import (
bing_search,
google_search,
read_text_file,
write_text_file,
execute_python_code,
ServiceToolkit,
)
DEFAULT_FLOW_VAR = "flow"
[docs]
class WorkflowNodeType(IntEnum):
"""Enum for workflow node."""
MODEL = 0
AGENT = 1
PIPELINE = 2
SERVICE = 3
MESSAGE = 4
COPY = 5
[docs]
class WorkflowNode(ABC):
"""
Abstract base class representing a generic node in a workflow.
WorkflowNode is designed to be subclassed with specific logic implemented
in the subclass methods. It provides an interface for initialization and
execution of operations when the node is called.
"""
node_type = None
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
"""
Initialize nodes. Implement specific initialization logic in
subclasses.
"""
self.node_id = node_id
self.opt_kwargs = opt_kwargs
self.source_kwargs = source_kwargs
self.dep_opts = dep_opts
self.dep_vars = [opt.var_name for opt in self.dep_opts]
self.var_name = f"{self.node_type.name.lower()}_{self.node_id}"
def __call__(self, x: dict = None): # type: ignore[no-untyped-def]
"""
Performs the operations of the node. Implement specific logic in
subclasses.
"""
[docs]
@abstractmethod
def compile(self) -> dict:
"""
Compile Node to python executable code dict
"""
return {
"imports": "",
"inits": "",
"execs": "",
}
[docs]
class ModelNode(WorkflowNode):
"""
A node that represents a model in a workflow.
The ModelNode can be used to load and execute a model as part of the
workflow node. It initializes model configurations and performs
model-related operations when called.
"""
node_type = WorkflowNodeType.MODEL
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
ModelManager.get_instance().load_model_configs([self.opt_kwargs])
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.manager import ModelManager",
"inits": f"ModelManager.get_instance().load_model_configs("
f"[{self.opt_kwargs}])",
"execs": "",
}
[docs]
class MsgNode(WorkflowNode):
"""
A node that manages messaging within a workflow.
MsgNode is responsible for handling messages, creating message objects,
and performing message-related operations when the node is invoked.
"""
node_type = WorkflowNodeType.MESSAGE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.msg = Msg(**self.opt_kwargs)
def __call__(self, x: dict = None) -> dict:
return self.msg
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.message import Msg",
"inits": f"{DEFAULT_FLOW_VAR} = Msg"
f"({kwarg_converter(self.opt_kwargs)})",
"execs": "",
}
[docs]
class DialogAgentNode(WorkflowNode):
"""
A node representing a DialogAgent within a workflow.
"""
node_type = WorkflowNodeType.AGENT
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.pipeline = DialogAgent(**self.opt_kwargs)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.agents import DialogAgent",
"inits": f"{self.var_name} = DialogAgent("
f"{kwarg_converter(self.opt_kwargs)})",
"execs": f"{DEFAULT_FLOW_VAR} = {self.var_name}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class UserAgentNode(WorkflowNode):
"""
A node representing a UserAgent within a workflow.
"""
node_type = WorkflowNodeType.AGENT
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.pipeline = UserAgent(**self.opt_kwargs)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.agents import UserAgent",
"inits": f"{self.var_name} = UserAgent("
f"{kwarg_converter(self.opt_kwargs)})",
"execs": f"{DEFAULT_FLOW_VAR} = {self.var_name}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class DictDialogAgentNode(WorkflowNode):
"""
A node representing a DictDialogAgent within a workflow.
"""
node_type = WorkflowNodeType.AGENT
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.pipeline = DictDialogAgent(**self.opt_kwargs)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.agents import DictDialogAgent",
"inits": f"{self.var_name} = DictDialogAgent("
f"{kwarg_converter(self.opt_kwargs)})",
"execs": f"{DEFAULT_FLOW_VAR} = {self.var_name}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class ReActAgentNode(WorkflowNode):
"""
A node representing a ReActAgent within a workflow.
"""
node_type = WorkflowNodeType.AGENT
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
# Build tools
self.service_toolkit = ServiceToolkit()
for tool in dep_opts:
if not hasattr(tool, "service_func"):
raise TypeError(f"{tool} must be tool!")
self.service_toolkit.add(tool.service_func)
self.pipeline = ReActAgent(
service_toolkit=self.service_toolkit,
**self.opt_kwargs,
)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
tools = deps_converter(self.dep_vars)[1:-1].split(",")
service_toolkit_code = ";".join(
f"{self.var_name}_service_toolkit.add({tool.strip()})"
for tool in tools
)
return {
"imports": "from agentscope.agents import ReActAgent",
"inits": f"{self.var_name}_service_toolkit = ServiceToolkit()\n"
f" {service_toolkit_code}\n"
f" {self.var_name} = ReActAgent"
f"({kwarg_converter(self.opt_kwargs)}, service_toolkit"
f"={self.var_name}_service_toolkit)",
"execs": f"{DEFAULT_FLOW_VAR} = {self.var_name}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class MsgHubNode(WorkflowNode):
"""
A node that serves as a messaging hub within a workflow.
MsgHubNode is responsible for broadcasting announcements to participants
and managing the flow of messages within a workflow's node.
"""
node_type = WorkflowNodeType.PIPELINE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.announcement = Msg(
name=self.opt_kwargs["announcement"].get("name", "Host"),
content=self.opt_kwargs["announcement"].get("content", "Welcome!"),
role="system",
)
assert len(self.dep_opts) == 1 and hasattr(
self.dep_opts[0],
"pipeline",
), (
"MsgHub members must be a list of length 1, with the first "
"element being an instance of PipelineBaseNode"
)
self.pipeline = self.dep_opts[0]
self.participants = get_all_agents(self.pipeline)
self.participants_var = get_all_agents(self.pipeline, return_var=True)
def __call__(self, x: dict = None) -> dict:
with msghub(self.participants, announcement=self.announcement):
x = self.pipeline(x)
return x
[docs]
def compile(self) -> dict:
announcement = (
f'Msg(name="'
f'{self.opt_kwargs["announcement"].get("name", "Host")}", '
f'content="'
f'{self.opt_kwargs["announcement"].get("content", "Host")}"'
f', role="system")'
)
execs = f"""with msghub({deps_converter(self.participants_var)},
announcement={announcement}):
{DEFAULT_FLOW_VAR} = {self.dep_vars[0]}({DEFAULT_FLOW_VAR})
"""
return {
"imports": "from agentscope.msghub import msghub\n"
"from agentscope.message import Msg",
"inits": "",
"execs": execs,
}
[docs]
class PlaceHolderNode(WorkflowNode):
"""
A placeholder node within a workflow.
This node acts as a placeholder and can be used to pass through information
or data without performing any significant operation.
"""
node_type = WorkflowNodeType.PIPELINE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.pipeline = placeholder
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.pipelines.functional import "
"placeholder",
"inits": f"{self.var_name} = placeholder",
"execs": f"{DEFAULT_FLOW_VAR} = {self.var_name}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class SequentialPipelineNode(WorkflowNode):
"""
A node representing a sequential node within a workflow.
SequentialPipelineNode executes a series of operators or nodes in a
sequence, where the output of one node is the input to the next.
"""
node_type = WorkflowNodeType.PIPELINE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.pipeline = SequentialPipeline(operators=self.dep_opts)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.pipelines import SequentialPipeline",
"inits": f"{self.var_name} = SequentialPipeline("
f"{deps_converter(self.dep_vars)})",
"execs": f"{DEFAULT_FLOW_VAR} = {self.var_name}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class ForLoopPipelineNode(WorkflowNode):
"""
A node representing a for-loop structure in a workflow.
ForLoopPipelineNode allows the execution of a pipeline node multiple times,
iterating over a given set of inputs or a specified range.
"""
node_type = WorkflowNodeType.PIPELINE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
assert (
len(self.dep_opts) == 1
), "ForLoopPipelineNode can only contain one PipelineNode."
self.pipeline = ForLoopPipeline(
loop_body_operators=self.dep_opts[0],
**self.opt_kwargs,
)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.pipelines import ForLoopPipeline",
"inits": f"{self.var_name} = ForLoopPipeline("
f"loop_body_operators="
f"{deps_converter(self.dep_vars)},"
f" {kwarg_converter(self.source_kwargs)})",
"execs": f"{DEFAULT_FLOW_VAR} = {self.var_name}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class WhileLoopPipelineNode(WorkflowNode):
"""
A node representing a while-loop structure in a workflow.
WhileLoopPipelineNode enables conditional repeated execution of a node
node based on a specified condition.
"""
node_type = WorkflowNodeType.PIPELINE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
assert (
len(self.dep_opts) == 1
), "WhileLoopPipelineNode can only contain one PipelineNode."
self.pipeline = WhileLoopPipeline(
loop_body_operators=self.dep_opts[0],
**self.opt_kwargs,
)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.pipelines import WhileLoopPipeline",
"inits": f"{self.var_name} = WhileLoopPipeline("
f"loop_body_operators="
f"{deps_converter(self.dep_vars)},"
f" {kwarg_converter(self.source_kwargs)})",
"execs": f"{DEFAULT_FLOW_VAR} = {self.var_name}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class IfElsePipelineNode(WorkflowNode):
"""
A node representing an if-else conditional structure in a workflow.
IfElsePipelineNode directs the flow of execution to different node
nodes based on a specified condition.
"""
node_type = WorkflowNodeType.PIPELINE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
assert (
0 < len(self.dep_opts) <= 2
), "IfElsePipelineNode must contain one or two PipelineNode."
if len(self.dep_opts) == 1:
self.pipeline = IfElsePipeline(
if_body_operators=self.dep_opts[0],
**self.opt_kwargs,
)
elif len(self.dep_opts) == 2:
self.pipeline = IfElsePipeline(
if_body_operators=self.dep_opts[0],
else_body_operators=self.dep_opts[1],
**self.opt_kwargs,
)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
imports = "from agentscope.pipelines import IfElsePipeline"
execs = f"{DEFAULT_FLOW_VAR} = {self.var_name}({DEFAULT_FLOW_VAR})"
if len(self.dep_vars) == 1:
return {
"imports": imports,
"inits": f"{self.var_name} = IfElsePipeline("
f"if_body_operators={self.dep_vars[0]})",
"execs": execs,
}
elif len(self.dep_vars) == 2:
return {
"imports": imports,
"inits": f"{self.var_name} = IfElsePipeline("
f"if_body_operators={self.dep_vars[0]}, "
f"else_body_operators={self.dep_vars[1]})",
"execs": execs,
}
raise ValueError
[docs]
class SwitchPipelineNode(WorkflowNode):
"""
A node representing a switch-case structure within a workflow.
SwitchPipelineNode routes the execution to different node nodes
based on the evaluation of a specified key or condition.
"""
node_type = WorkflowNodeType.PIPELINE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
assert 0 < len(self.dep_opts), (
"SwitchPipelineNode must contain at least " "one PipelineNode."
)
case_operators = {}
self.case_operators_var = {}
if len(self.dep_opts) == len(self.opt_kwargs["cases"]):
# No default_operators provided
default_operators = placeholder
self.default_var_name = "placeholder"
elif len(self.dep_opts) == len(self.opt_kwargs["cases"]) + 1:
# default_operators provided
default_operators = self.dep_opts.pop(-1)
self.default_var_name = self.dep_vars.pop(-1)
else:
raise ValueError(
f"SwitchPipelineNode deps {self.dep_opts} not matches "
f"cases {self.opt_kwargs['cases']}.",
)
for key, value, var in zip(
self.opt_kwargs["cases"],
self.dep_opts,
self.dep_vars,
):
case_operators[key] = value.pipeline
self.case_operators_var[key] = var
self.opt_kwargs.pop("cases")
self.source_kwargs.pop("cases")
self.pipeline = SwitchPipeline(
case_operators=case_operators,
default_operators=default_operators, # type: ignore[arg-type]
**self.opt_kwargs,
)
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
imports = (
"from agentscope.pipelines import SwitchPipeline\n"
"from agentscope.pipelines.functional import placeholder"
)
execs = f"{DEFAULT_FLOW_VAR} = {self.var_name}({DEFAULT_FLOW_VAR})"
return {
"imports": imports,
"inits": f"{self.var_name} = SwitchPipeline(case_operators="
f"{dict_converter(self.case_operators_var)}, "
f"default_operators={self.default_var_name},"
f" {kwarg_converter(self.source_kwargs)})",
"execs": execs,
}
[docs]
class CopyNode(WorkflowNode):
"""
A node that duplicates the output of another node in the workflow.
CopyNode is used to replicate the results of a parent node and can be
useful in workflows where the same output is needed for multiple
subsequent operations.
"""
node_type = WorkflowNodeType.COPY
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
assert len(self.dep_opts) == 1, "CopyNode can only have one parent!"
self.pipeline = self.dep_opts[0]
def __call__(self, x: dict = None) -> dict:
return self.pipeline(x)
[docs]
def compile(self) -> dict:
return {
"imports": "",
"inits": "",
"execs": f"{DEFAULT_FLOW_VAR} = {self.dep_vars[0]}"
f"({DEFAULT_FLOW_VAR})",
}
[docs]
class BingSearchServiceNode(WorkflowNode):
"""
Bing Search Node
"""
node_type = WorkflowNodeType.SERVICE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.service_func = partial(bing_search, **self.opt_kwargs)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.service import ServiceToolkit\n"
"from functools import partial\n"
"from agentscope.service import bing_search",
"inits": f"{self.var_name} = partial(bing_search,"
f" {kwarg_converter(self.opt_kwargs)})",
"execs": "",
}
[docs]
class GoogleSearchServiceNode(WorkflowNode):
"""
Google Search Node
"""
node_type = WorkflowNodeType.SERVICE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.service_func = partial(google_search, **self.opt_kwargs)
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.service import ServiceToolkit\n"
"from functools import partial\n"
"from agentscope.service import google_search",
"inits": f"{self.var_name} = partial(google_search,"
f" {kwarg_converter(self.opt_kwargs)})",
"execs": "",
}
[docs]
class PythonServiceNode(WorkflowNode):
"""
Execute python Node
"""
node_type = WorkflowNodeType.SERVICE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.service_func = execute_python_code
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.service import ServiceToolkit\n"
"from agentscope.service import execute_python_code",
"inits": f"{self.var_name} = execute_python_code",
"execs": "",
}
[docs]
class ReadTextServiceNode(WorkflowNode):
"""
Read Text Service Node
"""
node_type = WorkflowNodeType.SERVICE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.service_func = read_text_file
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.service import ServiceToolkit\n"
"from agentscope.service import read_text_file",
"inits": f"{self.var_name} = read_text_file",
"execs": "",
}
[docs]
class WriteTextServiceNode(WorkflowNode):
"""
Write Text Service Node
"""
node_type = WorkflowNodeType.SERVICE
def __init__(
self,
node_id: str,
opt_kwargs: dict,
source_kwargs: dict,
dep_opts: list,
) -> None:
super().__init__(node_id, opt_kwargs, source_kwargs, dep_opts)
self.service_func = write_text_file
[docs]
def compile(self) -> dict:
return {
"imports": "from agentscope.service import ServiceToolkit\n"
"from agentscope.service import write_text_file",
"inits": f"{self.var_name} = write_text_file",
"execs": "",
}
NODE_NAME_MAPPING = {
"dashscope_chat": ModelNode,
"openai_chat": ModelNode,
"post_api_chat": ModelNode,
"post_api_dall_e": ModelNode,
"Message": MsgNode,
"DialogAgent": DialogAgentNode,
"UserAgent": UserAgentNode,
"DictDialogAgent": DictDialogAgentNode,
"ReActAgent": ReActAgentNode,
"Placeholder": PlaceHolderNode,
"MsgHub": MsgHubNode,
"SequentialPipeline": SequentialPipelineNode,
"ForLoopPipeline": ForLoopPipelineNode,
"WhileLoopPipeline": WhileLoopPipelineNode,
"IfElsePipeline": IfElsePipelineNode,
"SwitchPipeline": SwitchPipelineNode,
"CopyNode": CopyNode,
"BingSearchService": BingSearchServiceNode,
"GoogleSearchService": GoogleSearchServiceNode,
"PythonService": PythonServiceNode,
"ReadTextService": ReadTextServiceNode,
"WriteTextService": WriteTextServiceNode,
}
[docs]
def get_all_agents(
node: WorkflowNode,
seen_agents: Optional[set] = None,
return_var: bool = False,
) -> List:
"""
Retrieve all unique agent objects from a pipeline.
Recursively traverses the pipeline to collect all distinct agent-based
participants. Prevents duplication by tracking already seen agents.
Args:
node (WorkflowNode): The WorkflowNode from which to extract agents.
seen_agents (set, optional): A set of agents that have already been
seen to avoid duplication. Defaults to None.
Returns:
list: A list of unique agent objects found in the pipeline.
"""
if seen_agents is None:
seen_agents = set()
all_agents = []
for participant in node.pipeline.participants:
if participant.node_type == WorkflowNodeType.AGENT:
if participant not in seen_agents:
if return_var:
all_agents.append(participant.var_name)
else:
all_agents.append(participant.pipeline)
seen_agents.add(participant.pipeline)
elif participant.node_type == WorkflowNodeType.PIPELINE:
nested_agents = get_all_agents(
participant,
seen_agents,
return_var=return_var,
)
all_agents.extend(nested_agents)
else:
raise TypeError(type(participant))
return all_agents